diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index d86ca84ad..1ef4ea293 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -26,9 +26,11 @@ -export_type([inflight/0]). +-include_lib("emqx/include/logger.hrl"). -include("emqx_persistent_session_ds.hrl"). -ifdef(TEST). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -65,9 +67,17 @@ new() -> #inflight{}. -spec next_packet_id(inflight()) -> {emqx_types:packet_id(), inflight()}. -next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqno}) -> - Inflight = Inflight0#inflight{next_seqno = LastSeqno + 1}, - {seqno_to_packet_id(LastSeqno), Inflight}. +next_packet_id(Inflight0 = #inflight{next_seqno = LastSeqNo}) -> + Inflight = Inflight0#inflight{next_seqno = LastSeqNo + 1}, + case LastSeqNo rem 16#10000 of + 0 -> + %% We skip sequence numbers that lead to PacketId = 0 to + %% simplify math. Note: it leads to occasional gaps in the + %% sequence numbers. + next_packet_id(Inflight); + PacketId -> + {PacketId, Inflight} + end. -spec replay(emqx_persistent_session_ds:id(), inflight()) -> emqx_session:replies(). @@ -83,8 +93,20 @@ commit_offset( acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0 } ) -> - AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId), - true = AckedSeqno0 < AckedSeqno, + AckedSeqno = + case packet_id_to_seqno(NextSeqNo, PacketId) of + N when N > AckedSeqno0; AckedSeqno0 =:= 0 -> + N; + OutOfRange -> + ?SLOG(warning, #{ + msg => "out-of-order_ack", + prev_seqno => AckedSeqno0, + acked_seqno => OutOfRange, + next_seqno => NextSeqNo, + packet_id => PacketId + }), + AckedSeqno0 + end, Ranges = lists:filter( fun(#range{stream = Stream, last = LastSeqno, iterator_next = ItNext}) -> case LastSeqno =< AckedSeqno of @@ -140,18 +162,17 @@ fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) -> #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0, ItBegin = get_last_iterator(SessionId, Stream, Ranges0), {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N), - {Publishes, Inflight1} = + {NMessages, Publishes, Inflight1} = lists:foldl( - fun(Msg, {PubAcc0, InflightAcc0}) -> + fun(Msg, {N0, PubAcc0, InflightAcc0}) -> {PacketId, InflightAcc} = next_packet_id(InflightAcc0), PubAcc = [{PacketId, Msg} | PubAcc0], - {PubAcc, InflightAcc} + {N0 + 1, PubAcc, InflightAcc} end, - {Publishes0, Inflight0}, + {0, Publishes0, Inflight0}, Messages ), #inflight{next_seqno = LastSeqNo} = Inflight1, - NMessages = LastSeqNo - FirstSeqNo, case NMessages > 0 of true -> Range = #range{ @@ -193,25 +214,22 @@ get_streams(SessionId) -> mnesia:dirty_read(?SESSION_STREAM_TAB, SessionId) ). -%% Packet ID as defined by MQTT protocol is a 16-bit integer in range -%% 1..FFFF. This function translates internal session sequence number -%% to MQTT packet ID by chopping off most significant bits and adding -%% 1. This assumes that there's never more FFFF in-flight packets at -%% any time: --spec seqno_to_packet_id(non_neg_integer()) -> emqx_types:packet_id(). -seqno_to_packet_id(Counter) -> - Counter rem 16#ffff + 1. - %% Reconstruct session counter by adding most significant bits from %% the current counter to the packet id. -spec packet_id_to_seqno(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer(). packet_id_to_seqno(NextSeqNo, PacketId) -> - N = ((NextSeqNo bsr 16) bsl 16) + PacketId, - case N > NextSeqNo of - true -> N - 16#10000; - false -> N + Epoch = NextSeqNo bsr 16, + case packet_id_to_seqno_(Epoch, PacketId) of + N when N =< NextSeqNo -> + N; + _ -> + packet_id_to_seqno_(Epoch - 1, PacketId) end. +-spec packet_id_to_seqno_(non_neg_integer(), emqx_types:packet_id()) -> non_neg_integer(). +packet_id_to_seqno_(Epoch, PacketId) -> + (Epoch bsl 16) + PacketId. + -spec shuffle([A]) -> [A]. shuffle(L0) -> L1 = lists:map( @@ -223,3 +241,57 @@ shuffle(L0) -> L2 = lists:sort(L1), {_, L} = lists:unzip(L2), L. + +-ifdef(TEST). + +%% This test only tests boundary conditions (to make sure property-based test didn't skip them): +packet_id_to_seqno_test() -> + %% Packet ID = 1; first epoch: + ?assertEqual(1, packet_id_to_seqno(1, 1)), + ?assertEqual(1, packet_id_to_seqno(10, 1)), + ?assertEqual(1, packet_id_to_seqno(1 bsl 16 - 1, 1)), + ?assertEqual(1, packet_id_to_seqno(1 bsl 16, 1)), + %% Packet ID = 1; second and 3rd epochs: + ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(1 bsl 16 + 1, 1)), + ?assertEqual(1 bsl 16 + 1, packet_id_to_seqno(2 bsl 16, 1)), + ?assertEqual(2 bsl 16 + 1, packet_id_to_seqno(2 bsl 16 + 1, 1)), + %% Packet ID = 16#ffff: + PID = 1 bsl 16 - 1, + ?assertEqual(PID, packet_id_to_seqno(PID, PID)), + ?assertEqual(PID, packet_id_to_seqno(1 bsl 16, PID)), + ?assertEqual(1 bsl 16 + PID, packet_id_to_seqno(2 bsl 16, PID)), + ok. + +packet_id_to_seqno_test_() -> + Opts = [{numtests, 1000}, {to_file, user}], + {timeout, 30, fun() -> ?assert(proper:quickcheck(packet_id_to_seqno_prop(), Opts)) end}. + +packet_id_to_seqno_prop() -> + ?FORALL( + NextSeqNo, + next_seqno_gen(), + ?FORALL( + SeqNo, + seqno_gen(NextSeqNo), + begin + PacketId = SeqNo rem 16#10000, + ?assertEqual(SeqNo, packet_id_to_seqno(NextSeqNo, PacketId)), + true + end + ) + ). + +next_seqno_gen() -> + ?LET( + {Epoch, Offset}, + {non_neg_integer(), non_neg_integer()}, + Epoch bsl 16 + Offset + ). + +seqno_gen(NextSeqNo) -> + WindowSize = 1 bsl 16 - 1, + Min = max(0, NextSeqNo - WindowSize), + Max = max(0, NextSeqNo - 1), + range(Min, Max). + +-endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5ab7723f7..b7b5d0df9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -340,7 +340,7 @@ deliver(_ClientInfo, _Delivers, Session) -> -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> - WindowSize = 100, + WindowSize = 1000, {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), %% TODO: make these values configurable: Timeout =