diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index a95e1c152..2f5348938 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -41,30 +41,11 @@ %% Note: sequence numbers are monotonic; they don't wrap around: -type seqno() :: non_neg_integer(). --record(range, { - stream :: _StreamRef, - first :: seqno(), - until :: seqno(), - %% Type of a range: - %% * Inflight range is a range of yet unacked messages from this stream. - %% * Checkpoint range was already acked, its purpose is to keep track of the - %% very last iterator for this stream. - type :: inflight | checkpoint, - %% Meaning of this depends on the type of the range: - %% * For inflight range, this is the iterator pointing to the first message in - %% the range. - %% * For checkpoint range, this is the iterator pointing right past the last - %% message in the range. - iterator :: emqx_ds:iterator() -}). - --type range() :: #range{}. - -record(inflight, { next_seqno = 1 :: seqno(), acked_until = 1 :: seqno(), %% Ranges are sorted in ascending order of their sequence numbers. - offset_ranges = [] :: [range()] + offset_ranges = [] :: [ds_pubrange()] }). -opaque inflight() :: #inflight{}. @@ -170,53 +151,51 @@ poll(SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < 16#7fff compute_inflight_range([]) -> {1, 1}; compute_inflight_range(Ranges) -> - _RangeLast = #range{until = LastSeqno} = lists:last(Ranges), - RangesUnacked = lists:dropwhile(fun(#range{type = T}) -> T == checkpoint end, Ranges), + _RangeLast = #ds_pubrange{until = LastSeqno} = lists:last(Ranges), + RangesUnacked = lists:dropwhile( + fun(#ds_pubrange{type = T}) -> T == checkpoint end, + Ranges + ), case RangesUnacked of - [#range{first = AckedUntil} | _] -> + [#ds_pubrange{id = {_, AckedUntil}} | _] -> {AckedUntil, LastSeqno}; [] -> {LastSeqno, LastSeqno} end. +-spec get_ranges(emqx_persistent_session_ds:id()) -> [ds_pubrange()]. get_ranges(SessionId) -> - DSRanges = mnesia:match_object( - ?SESSION_PUBRANGE_TAB, - #ds_pubrange{id = {SessionId, '_'}, _ = '_'}, - read + Pat = erlang:make_tuple( + record_info(size, ds_pubrange), + '_', + [{1, ds_pubrange}, {#ds_pubrange.id, {SessionId, '_'}}] ), - lists:map(fun export_range/1, DSRanges). - -export_range(#ds_pubrange{ - type = Type, id = {_, First}, until = Until, stream = StreamRef, iterator = It -}) -> - #range{type = Type, stream = StreamRef, first = First, until = Until, iterator = It}. + mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read). fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> - #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges0} = Inflight0, - ItBegin = get_last_iterator(DSStream, Ranges0), + #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, + ItBegin = get_last_iterator(DSStream, Ranges), {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), {Publishes, UntilSeqno} = publish(FirstSeqno, Messages), case range_size(FirstSeqno, UntilSeqno) of Size when Size > 0 -> - Range0 = #range{ + %% We need to preserve the iterator pointing to the beginning of the + %% range, so that we can replay it if needed. + Range0 = #ds_pubrange{ + id = {SessionId, FirstSeqno}, type = inflight, - first = FirstSeqno, until = UntilSeqno, stream = DSStream#ds_stream.ref, iterator = ItBegin }, - %% We need to preserve the iterator pointing to the beginning of the - %% range, so that we can replay it if needed. - ok = preserve_range(SessionId, Range0), + ok = preserve_range(Range0), %% ...Yet we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off: it will become %% `ItBegin` of the next range for this stream. - Range = Range0#range{iterator = ItEnd}, - Ranges = Ranges0 ++ [Range#range{iterator = ItEnd}], + Range = Range0#ds_pubrange{iterator = ItEnd}, Inflight = Inflight0#inflight{ next_seqno = UntilSeqno, - offset_ranges = Ranges + offset_ranges = Ranges ++ [Range] }, fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]); 0 -> @@ -238,7 +217,7 @@ discard_acked( find_checkpoints(Ranges) -> lists:foldl( - fun(#range{stream = StreamRef, until = Until}, Acc) -> + fun(#ds_pubrange{stream = StreamRef, until = Until}, Acc) -> %% For each stream, remember the last range over this stream. Acc#{StreamRef => Until} end, @@ -250,7 +229,7 @@ discard_acked_ranges( SessionId, AckedUntil, Checkpoints, - [Range = #range{until = Until, stream = StreamRef} | Rest] + [Range = #ds_pubrange{until = Until, stream = StreamRef} | Rest] ) when Until =< AckedUntil -> %% This range has been fully acked. %% Either discard it completely, or preserve the iterator for the next range @@ -258,11 +237,10 @@ discard_acked_ranges( RangeKept = case maps:get(StreamRef, Checkpoints) of CP when CP > Until -> - discard_range(SessionId, Range), + discard_range(Range), []; Until -> - checkpoint_range(SessionId, Range), - [Range#range{type = checkpoint}] + [checkpoint_range(Range)] end, %% Since we're (intentionally) not using transactions here, it's important to %% issue database writes in the same order in which ranges are stored: from @@ -274,7 +252,7 @@ discard_acked_ranges(_SessionId, _AckedUntil, _Checkpoints, Ranges) -> Ranges. replay_range( - Range0 = #range{type = inflight, first = First, until = Until, iterator = It}, + Range0 = #ds_pubrange{type = inflight, id = {_, First}, until = Until, iterator = It}, AckedUntil, Acc ) -> @@ -290,9 +268,11 @@ replay_range( end, %% Asserting that range is consistent with the message storage state. {Replies, Until} = publish(FirstUnacked, MessagesUnacked), - Range = Range0#range{iterator = ItNext}, + %% Again, we need to keep the iterator pointing past the end of the + %% range, so that we can pick up where we left off. + Range = Range0#ds_pubrange{iterator = ItNext}, {Range, Replies ++ Acc}; -replay_range(Range0 = #range{type = checkpoint}, _AckedUntil, Acc) -> +replay_range(Range0 = #ds_pubrange{type = checkpoint}, _AckedUntil, Acc) -> {Range0, Acc}. publish(FirstSeqno, Messages) -> @@ -305,46 +285,28 @@ publish(FirstSeqno, Messages) -> Messages ). --spec preserve_range(emqx_persistent_session_ds:id(), range()) -> ok. -preserve_range( - SessionId, - #range{first = First, until = Until, stream = StreamRef, iterator = It} -) -> - DSRange = #ds_pubrange{ - id = {SessionId, First}, - until = Until, - stream = StreamRef, - type = inflight, - iterator = It - }, - mria:dirty_write(?SESSION_PUBRANGE_TAB, DSRange). +-spec preserve_range(ds_pubrange()) -> ok. +preserve_range(Range = #ds_pubrange{type = inflight}) -> + mria:dirty_write(?SESSION_PUBRANGE_TAB, Range). --spec discard_range(emqx_persistent_session_ds:id(), range()) -> ok. -discard_range(SessionId, #range{first = First}) -> - mria:dirty_delete(?SESSION_PUBRANGE_TAB, {SessionId, First}). +-spec discard_range(ds_pubrange()) -> ok. +discard_range(#ds_pubrange{id = RangeId}) -> + mria:dirty_delete(?SESSION_PUBRANGE_TAB, RangeId). --spec checkpoint_range(emqx_persistent_session_ds:id(), range()) -> ok. -checkpoint_range( - SessionId, - #range{type = inflight, first = First, until = Until, stream = StreamRef, iterator = ItNext} -) -> - DSRange = #ds_pubrange{ - id = {SessionId, First}, - until = Until, - stream = StreamRef, - type = checkpoint, - iterator = ItNext - }, - mria:dirty_write(?SESSION_PUBRANGE_TAB, DSRange); -checkpoint_range(_SessionId, #range{type = checkpoint}) -> +-spec checkpoint_range(ds_pubrange()) -> ds_pubrange(). +checkpoint_range(Range0 = #ds_pubrange{type = inflight}) -> + Range = Range0#ds_pubrange{type = checkpoint}, + ok = mria:dirty_write(?SESSION_PUBRANGE_TAB, Range), + Range; +checkpoint_range(Range = #ds_pubrange{type = checkpoint}) -> %% This range should have been checkpointed already. - ok. + Range. get_last_iterator(DSStream = #ds_stream{ref = StreamRef}, Ranges) -> - case lists:keyfind(StreamRef, #range.stream, lists:reverse(Ranges)) of + case lists:keyfind(StreamRef, #ds_pubrange.stream, lists:reverse(Ranges)) of false -> DSStream#ds_stream.beginning; - #range{iterator = ItNext} -> + #ds_pubrange{iterator = ItNext} -> ItNext end. @@ -380,7 +342,7 @@ packet_id_to_seqno(NextSeqNo, PacketId) -> packet_id_to_seqno_(Epoch, PacketId) -> (Epoch bsl 16) + PacketId. --spec seqno_to_packet_id(seqno()) -> emqx_types:packet_id(). +-spec seqno_to_packet_id(seqno()) -> emqx_types:packet_id() | 0. seqno_to_packet_id(Seqno) -> Seqno rem 16#10000. @@ -475,21 +437,21 @@ compute_inflight_range_test_() -> ?_assertEqual( {12, 42}, compute_inflight_range([ - #range{first = 1, until = 2, type = checkpoint}, - #range{first = 4, until = 8, type = checkpoint}, - #range{first = 11, until = 12, type = checkpoint}, - #range{first = 12, until = 13, type = inflight}, - #range{first = 13, until = 20, type = inflight}, - #range{first = 20, until = 42, type = inflight} + #ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint}, + #ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint}, + #ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint}, + #ds_pubrange{id = {<<>>, 12}, until = 13, type = inflight}, + #ds_pubrange{id = {<<>>, 13}, until = 20, type = inflight}, + #ds_pubrange{id = {<<>>, 20}, until = 42, type = inflight} ]) ), ?_assertEqual( {13, 13}, compute_inflight_range([ - #range{first = 1, until = 2, type = checkpoint}, - #range{first = 4, until = 8, type = checkpoint}, - #range{first = 11, until = 12, type = checkpoint}, - #range{first = 12, until = 13, type = checkpoint} + #ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint}, + #ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint}, + #ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint}, + #ds_pubrange{id = {<<>>, 12}, until = 13, type = checkpoint} ]) ) ]. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index a3ea5a662..f84519901 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -43,12 +43,25 @@ -record(ds_pubrange, { id :: { + %% What session this range belongs to. _Session :: emqx_persistent_session_ds:id(), + %% Where this range starts. _First :: emqx_persistent_message_ds_replayer:seqno() }, + %% Where this range ends: the first seqno that is not included in the range. until :: emqx_persistent_message_ds_replayer:seqno(), + %% Which stream this range is over. stream :: _StreamRef, + %% Type of a range: + %% * Inflight range is a range of yet unacked messages from this stream. + %% * Checkpoint range was already acked, its purpose is to keep track of the + %% very last iterator for this stream. type :: inflight | checkpoint, + %% Meaning of this depends on the type of the range: + %% * For inflight range, this is the iterator pointing to the first message in + %% the range. + %% * For checkpoint range, this is the iterator pointing right past the last + %% message in the range. iterator :: emqx_ds:iterator() }). -type ds_pubrange() :: #ds_pubrange{}.