From b812db1e3c659c1e7cc650466ec55a30d1b66d6c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 9 Nov 2023 16:50:45 +0100 Subject: [PATCH 1/2] fix(ds): Fix packet id -> sequence number translation --- .../emqx_persistent_message_ds_replayer.erl | 116 ++++++++++++++---- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- 2 files changed, 94 insertions(+), 24 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 3964ee4e3..bd64db8b8 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -26,9 +26,11 @@ -export_type([inflight/0]). +-include_lib("emqx/include/logger.hrl"). -include("emqx_persistent_session_ds.hrl"). -ifdef(TEST). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -65,9 +67,17 @@ new() -> #inflight{}. -spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}. -next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) -> - Inflight = Inflight0#inflight{next_seqno = LastSeqno + 1}, - {seqno_to_packet_id(LastSeqno), Inflight}. +next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqNo}) -> + Inflight = Inflight0#inflight{next_seqno = LastSeqNo + 1}, + case LastSeqNo rem 16#10000 of + 0 -> + %% We skip sequence numbers that lead to PacketId = 0 to + %% simplify math. Note: it leads to occasional gaps in the + %% sequence numbers. + next_packet_id(Inflight); + PacketId -> + {PacketId, Inflight} + end. -spec replay(emqx_persistent_session_ds:id(), inflight()) -> emqx_session:replies(). @@ -83,8 +93,20 @@ commit_offset( acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0 } ) -> - AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId), - true = AckedSeqno0 < AckedSeqno, + AckedSeqno = + case packet_id_to_seqno(NextSeqNo, PacketId) of + N when N > AckedSeqno0; AckedSeqno0 =:= 0 -> + N; + OutOfRange -> + ?SLOG(warning, #{ + msg => "out-of-order_ack", + prev_seqno => AckedSeqno0, + acked_seqno => OutOfRange, + next_seqno => NextSeqNo, + packet_id => PacketId + }), + AckedSeqno0 + end, Ranges = lists:filter( fun(#range{stream = Stream, last = LastSeqno, iterator_next = ItNext}) -> case LastSeqno =< AckedSeqno of @@ -140,18 +162,17 @@ fetch(SessionId, Inflight0, [#ds_stream{stream = Stream} | Streams], N, Publishe #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0, ItBegin = get_last_iterator(SessionId, Stream, Ranges0), {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N), - {Publishes, Inflight1} = + {NMessages, Publishes, Inflight1} = lists:foldl( - fun(Msg, {PubAcc0, InflightAcc0}) -> + fun(Msg, {N0, PubAcc0, InflightAcc0}) -> {PacketId, InflightAcc} = next_packet_id(InflightAcc0), PubAcc = [{PacketId, Msg} | PubAcc0], - {PubAcc, InflightAcc} + {N0 + 1, PubAcc, InflightAcc} end, - {Publishes0, Inflight0}, + {0, Publishes0, Inflight0}, Messages ), #inflight{next_seqno = LastSeqNo} = Inflight1, - NMessages = LastSeqNo - FirstSeqNo, case NMessages > 0 of true -> Range = #range{ @@ -185,25 +206,22 @@ get_iterator(SessionId, Stream) -> get_streams(SessionId) -> mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId). -%% Packet ID as defined by MQTT protocol is a 16-bit integer in range -%% 1..FFFF. This function translates internal session sequence number -%% to MQTT packet ID by chopping off most significant bits and adding -%% 1. This assumes that there's never more FFFF in-flight packets at -%% any time: --spec seqno_to_packet_id(non_neg_integer()) -> emqx_types:packet_id(). -seqno_to_packet_id(Counter) -> - Counter rem 16#ffff + 1. - %% Reconstruct session counter by adding most significant bits from %% the current counter to the packet id. -spec packet_id_to_seqno(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer(). packet_id_to_seqno(NextSeqNo, PacketId) -> - N = ((NextSeqNo bsr 16) bsl 16) + PacketId, - case N > NextSeqNo of - true -> N - 16#10000; - false -> N + Epoch = NextSeqNo bsr 16, + case packet_id_to_seqno_(Epoch, PacketId) of + N when N =< NextSeqNo -> + N; + _ -> + packet_id_to_seqno_(Epoch - 1, PacketId) end. +-spec packet_id_to_seqno_(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer(). +packet_id_to_seqno_(Epoch, PacketId) -> + (Epoch bsl 16) + PacketId. + -spec shuffle([A]) -> [A]. shuffle(L0) -> L1 = lists:map( @@ -215,3 +233,55 @@ shuffle(L0) -> L2 = lists:sort(L1), {_, L} = lists:unzip(L2), L. + +-ifdef(TEST). + +%% This test only tests boundary conditions (to make sure property-based test didn't skip them): +packet_id_to_seqno_test() -> + %% Packet ID = 1; first epoch: + ?assertEqual(1, packet_id_to_seqno(1, 1)), + ?assertEqual(1, packet_id_to_seqno(10, 1)), + ?assertEqual(1, packet_id_to_seqno(1 bsl 16 - 1, 1)), + %% Packet ID = 1; second and 3rd epochs: + ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(1 bsl 16 + 1, 1)), + ?assertEqual(2 bsl 16 + 1, packet_id_to_seqno(2 bsl 16 + 1, 1)), + %% Packet ID = 16#ffff: + PID = 1 bsl 16 - 1, + ?assertEqual(PID, packet_id_to_seqno(PID, PID)), + ?assertEqual(PID, packet_id_to_seqno(1 bsl 16, PID)), + ?assertEqual(1 bsl 16 + PID, packet_id_to_seqno(2 bsl 16, PID)), + ok. + +packet_id_to_seqno_test_() -> + Opts = [{numtests, 1000}, {to_file, user}], + {timeout, 30, fun() -> ?assert(proper:quickcheck(packet_id_to_seqno_prop(), Opts)) end}. + +packet_id_to_seqno_prop() -> + ?FORALL( + NextSeqNo, + next_seqno_gen(), + ?FORALL( + SeqNo, + seqno_gen(NextSeqNo), + begin + PacketId = SeqNo rem 16#10000, + ?assertEqual(SeqNo, packet_id_to_seqno(NextSeqNo, PacketId)), + true + end + ) + ). + +next_seqno_gen() -> + ?LET( + {Epoch, Offset}, + {non_neg_integer(), non_neg_integer()}, + Epoch bsl 16 + Offset + ). + +seqno_gen(NextSeqNo) -> + WindowSize = 1 bsl 16 - 2, + Min = max(0, NextSeqNo - WindowSize), + Max = max(0, NextSeqNo - 1), + range(Min, Max). + +-endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index bf16567fd..992b032a4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -342,7 +342,7 @@ deliver(_ClientInfo, _Delivers, Session) -> -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> - WindowSize = 100, + WindowSize = 1000, {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), %% TODO: make these values configurable: Timeout = From 5aa9d026df82b8515ee5a3807c3c24979369701a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 9 Nov 2023 21:16:20 +0100 Subject: [PATCH 2/2] fix(ds): Apply review remarks Co-authored-by: Thales Macedo Garitezi --- apps/emqx/src/emqx_persistent_message_ds_replayer.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index bd64db8b8..3e8aa71cf 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -242,8 +242,10 @@ packet_id_to_seqno_test() -> ?assertEqual(1, packet_id_to_seqno(1, 1)), ?assertEqual(1, packet_id_to_seqno(10, 1)), ?assertEqual(1, packet_id_to_seqno(1 bsl 16 - 1, 1)), + ?assertEqual(1, packet_id_to_seqno(1 bsl 16, 1)), %% Packet ID = 1; second and 3rd epochs: ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(1 bsl 16 + 1, 1)), + ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(2 bsl 16, 1)), ?assertEqual(2 bsl 16 + 1, packet_id_to_seqno(2 bsl 16 + 1, 1)), %% Packet ID = 16#ffff: PID = 1 bsl 16 - 1, @@ -279,7 +281,7 @@ next_seqno_gen() -> ). seqno_gen(NextSeqNo) -> - WindowSize = 1 bsl 16 - 2, + WindowSize = 1 bsl 16 - 1, Min = max(0, NextSeqNo - WindowSize), Max = max(0, NextSeqNo - 1), range(Min, Max).