test(sessds): Fix failing tests

This commit is contained in:
ieQu1 2024-01-12 02:06:32 +01:00
parent e7b03cdc59
commit 39857626ce
4 changed files with 64 additions and 55 deletions

View File

@ -234,7 +234,7 @@ info(mqueue_dropped, _Session) ->
% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) ->
% AwaitingRel;
info(awaiting_rel_cnt, #{s := S}) ->
seqno_diff(?QOS_2, ?dup(?QOS_2), ?committed(?QOS_2), S);
seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
info(awaiting_rel_max, #{props := Conf}) ->
maps:get(max_awaiting_rel, Conf);
info(await_rel_timeout, #{props := Conf}) ->
@ -602,6 +602,7 @@ session_ensure_new(Id, ConnInfo, Conf) ->
?committed(?QOS_1),
?next(?QOS_2),
?dup(?QOS_2),
?rec,
?committed(?QOS_2)
]
),
@ -742,6 +743,7 @@ process_batch(
Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S),
Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S),
Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S),
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
Msgs = [
Msg
@ -784,11 +786,18 @@ process_batch(
?QOS_2 when SeqNoQos2 =< Comm2 ->
%% QoS2 message has been PUBCOMP'ed by the client, ignore:
Acc;
?QOS_2 when SeqNoQos2 =< Dup2 ->
?QOS_2 when SeqNoQos2 =< Rec ->
%% QoS2 message has been PUBREC'ed by the client, resend PUBREL:
emqx_persistent_session_ds_inflight:push({pubrel, SeqNoQos2}, Acc);
?QOS_2 when SeqNoQos2 =< Dup2 ->
%% QoS2 message has been sent, but we haven't received PUBREC.
%%
%% TODO: According to the MQTT standard 4.3.3:
%% DUP flag is never set for QoS2 messages? We
%% do so for mem sessions, though.
Msg1 = emqx_message:set_flag(dup, true, Msg),
emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg1}, Acc);
?QOS_2 ->
%% MQTT standard 4.3.3: DUP flag is never set for QoS2 messages:
emqx_persistent_session_ds_inflight:push({SeqNoQos2, Msg}, Acc)
end,
SeqNoQos1,
@ -821,13 +830,10 @@ do_drain_buffer(Inflight0, S0, Acc) ->
case Msg#message.qos of
?QOS_0 ->
do_drain_buffer(Inflight, S0, [{undefined, Msg} | Acc]);
?QOS_1 ->
S = emqx_persistent_session_ds_state:put_seqno(?dup(?QOS_1), SeqNo, S0),
Publish = {seqno_to_packet_id(?QOS_1, SeqNo), Msg},
do_drain_buffer(Inflight, S, [Publish | Acc]);
?QOS_2 ->
Publish = {seqno_to_packet_id(?QOS_2, SeqNo), Msg},
do_drain_buffer(Inflight, S0, [Publish | Acc])
Qos ->
S = emqx_persistent_session_ds_state:put_seqno(?dup(Qos), SeqNo, S0),
Publish = {seqno_to_packet_id(Qos, SeqNo), Msg},
do_drain_buffer(Inflight, S, [Publish | Acc])
end
end.
@ -898,7 +904,7 @@ commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
MinTrack = ?committed(?QOS_1),
MaxTrack = ?next(?QOS_1);
pubrec ->
MinTrack = ?dup(?QOS_2),
MinTrack = ?rec,
MaxTrack = ?next(?QOS_2);
pubcomp ->
MinTrack = ?committed(?QOS_2),

View File

@ -28,25 +28,22 @@
%%%%% Session sequence numbers:
%%
%% -----|----------|----------|------> seqno
%% | | |
%% committed dup next
%% -----|----------|-----|-----|------> seqno
%% | | | |
%% committed dup rec next
% (Qos2)
%% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP
%% for QoS2.
-define(committed(QOS), QOS).
%% Seqno becomes dup:
%% Seqno becomes dup after broker sends QoS1 or QoS2 message to the
%% client. Upon session reconnect, messages with seqno in the
%% committed..dup range are retransmitted with DUP flag.
%%
%% 1. After broker sends QoS1 message to the client. Upon session
%% reconnect, QoS1 messages with seqno in the committed..dup range are
%% retransmitted with DUP flag.
%%
%% 2. After it receives PUBREC from the client for the QoS2 message.
%% Upon session reconnect, PUBREL messages for QoS2 messages with
%% seqno in committed..dup are retransmitted.
-define(dup(QOS), (10 + QOS)).
-define(rec, 22).
%% Last seqno assigned to a message.
-define(next(QOS), (20 + QOS)).
-define(next(QOS), (30 + QOS)).
%%%%% State of the stream:
-record(ifs, {

View File

@ -101,6 +101,7 @@
| ?committed(?QOS_1)
| ?next(?QOS_2)
| ?dup(?QOS_2)
| ?rec
| ?committed(?QOS_2).
-opaque t() :: #{

View File

@ -36,7 +36,7 @@ all() ->
% NOTE
% Tests are disabled while existing session persistence impl is being
% phased out.
%%{group, persistence_disabled},
{group, persistence_disabled},
{group, persistence_enabled}
].
@ -54,10 +54,9 @@ all() ->
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
TCsNonGeneric = [t_choose_impl],
% {group, quic}, {group, ws}],
TCGroups = [{group, tcp}],
TCGroups = [{group, tcp}, {group, quic}, {group, ws}],
[
%% {persistence_disabled, TCGroups},
{persistence_disabled, TCGroups},
{persistence_enabled, TCGroups},
{tcp, [], TCs},
{quic, [], TCs -- TCsNonGeneric},
@ -677,6 +676,7 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
),
NAcked = 4,
?assert(NMsgs1 >= NAcked),
[ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)],
%% Ensure that PUBACKs are propagated to the channel.
@ -690,17 +690,18 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me2">>, payload = <<"M12">>, qos = 1}
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1}
],
ok = publish_many(Pubs2),
NPubs2 = length(Pubs2),
%% Now reconnect with auto ack to make sure all streams are
%% replayed till the end:
{ok, Client2} = emqtt:start_link([
{proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false},
{auto_ack, false}
{clean_start, false}
| Config
]),
@ -717,9 +718,9 @@ t_publish_many_while_client_is_gone_qos1(Config) ->
ct:pal("Msgs2 = ~p", [Msgs2]),
?assert(NMsgs2 < NPubs, {NMsgs2, '<', NPubs}),
%% ?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}),
%% ?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
NSame = max(0, NMsgs2 - NPubs2),
?assert(NMsgs2 > NPubs2, {NMsgs2, '>', NPubs2}),
?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
NSame = NMsgs2 - NPubs2,
?assert(
lists:all(fun(#{dup := Dup}) -> Dup end, lists:sublist(Msgs2, NSame))
),
@ -780,6 +781,11 @@ t_publish_many_while_client_is_gone(Config) ->
%% for its subscriptions after the client dies or reconnects, in addition
%% to PUBRELs for the messages it has PUBRECed. While client must send
%% PUBACKs and PUBRECs in order, those orders are independent of each other.
%%
%% Developer's note: for simplicity we publish all messages to the
%% same topic, since persistent session ds may reorder messages
%% that belong to different streams, and this particular test is
%% very sensitive the order.
ClientId = ?config(client_id, Config),
ConnFun = ?config(conn_fun, Config),
ClientOpts = [
@ -792,20 +798,18 @@ t_publish_many_while_client_is_gone(Config) ->
{ok, Client1} = emqtt:start_link([{clean_start, true} | ClientOpts]),
{ok, _} = emqtt:ConnFun(Client1),
{ok, _, [?QOS_1]} = emqtt:subscribe(Client1, <<"t/+/foo">>, ?QOS_1),
{ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"msg/feed/#">>, ?QOS_2),
{ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"loc/+/+/+">>, ?QOS_2),
{ok, _, [?QOS_2]} = emqtt:subscribe(Client1, <<"t">>, ?QOS_2),
Pubs1 = [
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1},
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 2},
#mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 2},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M5">>, qos = 2},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M6">>, qos = 1},
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M7">>, qos = 2},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M8">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M9">>, qos = 2}
#mqtt_msg{topic = <<"t">>, payload = <<"M1">>, qos = 1},
#mqtt_msg{topic = <<"t">>, payload = <<"M2">>, qos = 1},
#mqtt_msg{topic = <<"t">>, payload = <<"M3">>, qos = 2},
#mqtt_msg{topic = <<"t">>, payload = <<"M4">>, qos = 2},
#mqtt_msg{topic = <<"t">>, payload = <<"M5">>, qos = 2},
#mqtt_msg{topic = <<"t">>, payload = <<"M6">>, qos = 1},
#mqtt_msg{topic = <<"t">>, payload = <<"M7">>, qos = 2},
#mqtt_msg{topic = <<"t">>, payload = <<"M8">>, qos = 1},
#mqtt_msg{topic = <<"t">>, payload = <<"M9">>, qos = 2}
],
ok = publish_many(Pubs1),
NPubs1 = length(Pubs1),
@ -827,7 +831,7 @@ t_publish_many_while_client_is_gone(Config) ->
[PktId || #{qos := 1, packet_id := PktId} <- Msgs1]
),
%% PUBREC first `NRecs` QoS 2 messages.
%% PUBREC first `NRecs` QoS 2 messages (up to "M5")
NRecs = 3,
PubRecs1 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs1], NRecs),
lists:foreach(
@ -851,9 +855,9 @@ t_publish_many_while_client_is_gone(Config) ->
maybe_kill_connection_process(ClientId, Config),
Pubs2 = [
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M10">>, qos = 2},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M11">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M12">>, qos = 2}
#mqtt_msg{topic = <<"t">>, payload = <<"M10">>, qos = 2},
#mqtt_msg{topic = <<"t">>, payload = <<"M11">>, qos = 1},
#mqtt_msg{topic = <<"t">>, payload = <<"M12">>, qos = 2}
],
ok = publish_many(Pubs2),
NPubs2 = length(Pubs2),
@ -886,8 +890,8 @@ t_publish_many_while_client_is_gone(Config) ->
Msgs2Dups
),
%% Now complete all yet incomplete QoS 2 message flows instead.
PubRecs2 = [PktId || #{qos := 2, packet_id := PktId} <- Msgs2],
%% Ack more messages:
PubRecs2 = lists:sublist([PktId || #{qos := 2, packet_id := PktId} <- Msgs2], 2),
lists:foreach(
fun(PktId) -> ok = emqtt:pubrec(Client2, PktId) end,
PubRecs2
@ -903,6 +907,7 @@ t_publish_many_while_client_is_gone(Config) ->
%% PUBCOMP every PUBREL.
PubComps = [PktId || {pubrel, #{packet_id := PktId}} <- PubRels1 ++ PubRels2],
ct:pal("PubComps: ~p", [PubComps]),
lists:foreach(
fun(PktId) -> ok = emqtt:pubcomp(Client2, PktId) end,
PubComps
@ -910,19 +915,19 @@ t_publish_many_while_client_is_gone(Config) ->
%% Ensure that PUBCOMPs are propagated to the channel.
pong = emqtt:ping(Client2),
%% Reconnect for the last time
ok = disconnect_client(Client2),
maybe_kill_connection_process(ClientId, Config),
{ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]),
{ok, _} = emqtt:ConnFun(Client3),
%% Only the last unacked QoS 1 message should be retransmitted.
%% Check that the messages are retransmitted with DUP=1:
Msgs3 = receive_messages(NPubs, _Timeout = 2000),
ct:pal("Msgs3 = ~p", [Msgs3]),
?assertMatch(
[#{topic := <<"t/100/foo">>, payload := <<"M11">>, qos := 1, dup := true}],
Msgs3
[<<"M10">>, <<"M11">>, <<"M12">>],
[I || #{payload := I} <- Msgs3]
),
ok = disconnect_client(Client3).