refactor(sessds): use `ds_pubrange` record as is

Instead of converting it into almost similar runtime representation.
This commit is contained in:
Andrew Mayorov 2023-11-20 14:10:53 +07:00
parent a5ff4144fe
commit 7081f1951f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 70 additions and 95 deletions

View File

@ -41,30 +41,11 @@
%% Note: sequence numbers are monotonic; they don't wrap around: %% Note: sequence numbers are monotonic; they don't wrap around:
-type seqno() :: non_neg_integer(). -type seqno() :: non_neg_integer().
-record(range, {
stream :: _StreamRef,
first :: seqno(),
until :: seqno(),
%% Type of a range:
%% * Inflight range is a range of yet unacked messages from this stream.
%% * Checkpoint range was already acked, its purpose is to keep track of the
%% very last iterator for this stream.
type :: inflight | checkpoint,
%% Meaning of this depends on the type of the range:
%% * For inflight range, this is the iterator pointing to the first message in
%% the range.
%% * For checkpoint range, this is the iterator pointing right past the last
%% message in the range.
iterator :: emqx_ds:iterator()
}).
-type range() :: #range{}.
-record(inflight, { -record(inflight, {
next_seqno = 1 :: seqno(), next_seqno = 1 :: seqno(),
acked_until = 1 :: seqno(), acked_until = 1 :: seqno(),
%% Ranges are sorted in ascending order of their sequence numbers. %% Ranges are sorted in ascending order of their sequence numbers.
offset_ranges = [] :: [range()] offset_ranges = [] :: [ds_pubrange()]
}). }).
-opaque inflight() :: #inflight{}. -opaque inflight() :: #inflight{}.
@ -170,53 +151,51 @@ poll(SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < 16#7fff
compute_inflight_range([]) -> compute_inflight_range([]) ->
{1, 1}; {1, 1};
compute_inflight_range(Ranges) -> compute_inflight_range(Ranges) ->
_RangeLast = #range{until = LastSeqno} = lists:last(Ranges), _RangeLast = #ds_pubrange{until = LastSeqno} = lists:last(Ranges),
RangesUnacked = lists:dropwhile(fun(#range{type = T}) -> T == checkpoint end, Ranges), RangesUnacked = lists:dropwhile(
fun(#ds_pubrange{type = T}) -> T == checkpoint end,
Ranges
),
case RangesUnacked of case RangesUnacked of
[#range{first = AckedUntil} | _] -> [#ds_pubrange{id = {_, AckedUntil}} | _] ->
{AckedUntil, LastSeqno}; {AckedUntil, LastSeqno};
[] -> [] ->
{LastSeqno, LastSeqno} {LastSeqno, LastSeqno}
end. end.
-spec get_ranges(emqx_persistent_session_ds:id()) -> [ds_pubrange()].
get_ranges(SessionId) -> get_ranges(SessionId) ->
DSRanges = mnesia:match_object( Pat = erlang:make_tuple(
?SESSION_PUBRANGE_TAB, record_info(size, ds_pubrange),
#ds_pubrange{id = {SessionId, '_'}, _ = '_'}, '_',
read [{1, ds_pubrange}, {#ds_pubrange.id, {SessionId, '_'}}]
), ),
lists:map(fun export_range/1, DSRanges). mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read).
export_range(#ds_pubrange{
type = Type, id = {_, First}, until = Until, stream = StreamRef, iterator = It
}) ->
#range{type = Type, stream = StreamRef, first = First, until = Until, iterator = It}.
fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> fetch(SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
#inflight{next_seqno = FirstSeqno, offset_ranges = Ranges0} = Inflight0, #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
ItBegin = get_last_iterator(DSStream, Ranges0), ItBegin = get_last_iterator(DSStream, Ranges),
{ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
{Publishes, UntilSeqno} = publish(FirstSeqno, Messages), {Publishes, UntilSeqno} = publish(FirstSeqno, Messages),
case range_size(FirstSeqno, UntilSeqno) of case range_size(FirstSeqno, UntilSeqno) of
Size when Size > 0 -> Size when Size > 0 ->
Range0 = #range{ %% We need to preserve the iterator pointing to the beginning of the
%% range, so that we can replay it if needed.
Range0 = #ds_pubrange{
id = {SessionId, FirstSeqno},
type = inflight, type = inflight,
first = FirstSeqno,
until = UntilSeqno, until = UntilSeqno,
stream = DSStream#ds_stream.ref, stream = DSStream#ds_stream.ref,
iterator = ItBegin iterator = ItBegin
}, },
%% We need to preserve the iterator pointing to the beginning of the ok = preserve_range(Range0),
%% range, so that we can replay it if needed.
ok = preserve_range(SessionId, Range0),
%% ...Yet we need to keep the iterator pointing past the end of the %% ...Yet we need to keep the iterator pointing past the end of the
%% range, so that we can pick up where we left off: it will become %% range, so that we can pick up where we left off: it will become
%% `ItBegin` of the next range for this stream. %% `ItBegin` of the next range for this stream.
Range = Range0#range{iterator = ItEnd}, Range = Range0#ds_pubrange{iterator = ItEnd},
Ranges = Ranges0 ++ [Range#range{iterator = ItEnd}],
Inflight = Inflight0#inflight{ Inflight = Inflight0#inflight{
next_seqno = UntilSeqno, next_seqno = UntilSeqno,
offset_ranges = Ranges offset_ranges = Ranges ++ [Range]
}, },
fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]); fetch(SessionId, Inflight, Streams, N - Size, [Publishes | Acc]);
0 -> 0 ->
@ -238,7 +217,7 @@ discard_acked(
find_checkpoints(Ranges) -> find_checkpoints(Ranges) ->
lists:foldl( lists:foldl(
fun(#range{stream = StreamRef, until = Until}, Acc) -> fun(#ds_pubrange{stream = StreamRef, until = Until}, Acc) ->
%% For each stream, remember the last range over this stream. %% For each stream, remember the last range over this stream.
Acc#{StreamRef => Until} Acc#{StreamRef => Until}
end, end,
@ -250,7 +229,7 @@ discard_acked_ranges(
SessionId, SessionId,
AckedUntil, AckedUntil,
Checkpoints, Checkpoints,
[Range = #range{until = Until, stream = StreamRef} | Rest] [Range = #ds_pubrange{until = Until, stream = StreamRef} | Rest]
) when Until =< AckedUntil -> ) when Until =< AckedUntil ->
%% This range has been fully acked. %% This range has been fully acked.
%% Either discard it completely, or preserve the iterator for the next range %% Either discard it completely, or preserve the iterator for the next range
@ -258,11 +237,10 @@ discard_acked_ranges(
RangeKept = RangeKept =
case maps:get(StreamRef, Checkpoints) of case maps:get(StreamRef, Checkpoints) of
CP when CP > Until -> CP when CP > Until ->
discard_range(SessionId, Range), discard_range(Range),
[]; [];
Until -> Until ->
checkpoint_range(SessionId, Range), [checkpoint_range(Range)]
[Range#range{type = checkpoint}]
end, end,
%% Since we're (intentionally) not using transactions here, it's important to %% Since we're (intentionally) not using transactions here, it's important to
%% issue database writes in the same order in which ranges are stored: from %% issue database writes in the same order in which ranges are stored: from
@ -274,7 +252,7 @@ discard_acked_ranges(_SessionId, _AckedUntil, _Checkpoints, Ranges) ->
Ranges. Ranges.
replay_range( replay_range(
Range0 = #range{type = inflight, first = First, until = Until, iterator = It}, Range0 = #ds_pubrange{type = inflight, id = {_, First}, until = Until, iterator = It},
AckedUntil, AckedUntil,
Acc Acc
) -> ) ->
@ -290,9 +268,11 @@ replay_range(
end, end,
%% Asserting that range is consistent with the message storage state. %% Asserting that range is consistent with the message storage state.
{Replies, Until} = publish(FirstUnacked, MessagesUnacked), {Replies, Until} = publish(FirstUnacked, MessagesUnacked),
Range = Range0#range{iterator = ItNext}, %% Again, we need to keep the iterator pointing past the end of the
%% range, so that we can pick up where we left off.
Range = Range0#ds_pubrange{iterator = ItNext},
{Range, Replies ++ Acc}; {Range, Replies ++ Acc};
replay_range(Range0 = #range{type = checkpoint}, _AckedUntil, Acc) -> replay_range(Range0 = #ds_pubrange{type = checkpoint}, _AckedUntil, Acc) ->
{Range0, Acc}. {Range0, Acc}.
publish(FirstSeqno, Messages) -> publish(FirstSeqno, Messages) ->
@ -305,46 +285,28 @@ publish(FirstSeqno, Messages) ->
Messages Messages
). ).
-spec preserve_range(emqx_persistent_session_ds:id(), range()) -> ok. -spec preserve_range(ds_pubrange()) -> ok.
preserve_range( preserve_range(Range = #ds_pubrange{type = inflight}) ->
SessionId, mria:dirty_write(?SESSION_PUBRANGE_TAB, Range).
#range{first = First, until = Until, stream = StreamRef, iterator = It}
) ->
DSRange = #ds_pubrange{
id = {SessionId, First},
until = Until,
stream = StreamRef,
type = inflight,
iterator = It
},
mria:dirty_write(?SESSION_PUBRANGE_TAB, DSRange).
-spec discard_range(emqx_persistent_session_ds:id(), range()) -> ok. -spec discard_range(ds_pubrange()) -> ok.
discard_range(SessionId, #range{first = First}) -> discard_range(#ds_pubrange{id = RangeId}) ->
mria:dirty_delete(?SESSION_PUBRANGE_TAB, {SessionId, First}). mria:dirty_delete(?SESSION_PUBRANGE_TAB, RangeId).
-spec checkpoint_range(emqx_persistent_session_ds:id(), range()) -> ok. -spec checkpoint_range(ds_pubrange()) -> ds_pubrange().
checkpoint_range( checkpoint_range(Range0 = #ds_pubrange{type = inflight}) ->
SessionId, Range = Range0#ds_pubrange{type = checkpoint},
#range{type = inflight, first = First, until = Until, stream = StreamRef, iterator = ItNext} ok = mria:dirty_write(?SESSION_PUBRANGE_TAB, Range),
) -> Range;
DSRange = #ds_pubrange{ checkpoint_range(Range = #ds_pubrange{type = checkpoint}) ->
id = {SessionId, First},
until = Until,
stream = StreamRef,
type = checkpoint,
iterator = ItNext
},
mria:dirty_write(?SESSION_PUBRANGE_TAB, DSRange);
checkpoint_range(_SessionId, #range{type = checkpoint}) ->
%% This range should have been checkpointed already. %% This range should have been checkpointed already.
ok. Range.
get_last_iterator(DSStream = #ds_stream{ref = StreamRef}, Ranges) -> get_last_iterator(DSStream = #ds_stream{ref = StreamRef}, Ranges) ->
case lists:keyfind(StreamRef, #range.stream, lists:reverse(Ranges)) of case lists:keyfind(StreamRef, #ds_pubrange.stream, lists:reverse(Ranges)) of
false -> false ->
DSStream#ds_stream.beginning; DSStream#ds_stream.beginning;
#range{iterator = ItNext} -> #ds_pubrange{iterator = ItNext} ->
ItNext ItNext
end. end.
@ -380,7 +342,7 @@ packet_id_to_seqno(NextSeqNo, PacketId) ->
packet_id_to_seqno_(Epoch, PacketId) -> packet_id_to_seqno_(Epoch, PacketId) ->
(Epoch bsl 16) + PacketId. (Epoch bsl 16) + PacketId.
-spec seqno_to_packet_id(seqno()) -> emqx_types:packet_id(). -spec seqno_to_packet_id(seqno()) -> emqx_types:packet_id() | 0.
seqno_to_packet_id(Seqno) -> seqno_to_packet_id(Seqno) ->
Seqno rem 16#10000. Seqno rem 16#10000.
@ -475,21 +437,21 @@ compute_inflight_range_test_() ->
?_assertEqual( ?_assertEqual(
{12, 42}, {12, 42},
compute_inflight_range([ compute_inflight_range([
#range{first = 1, until = 2, type = checkpoint}, #ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint},
#range{first = 4, until = 8, type = checkpoint}, #ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint},
#range{first = 11, until = 12, type = checkpoint}, #ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint},
#range{first = 12, until = 13, type = inflight}, #ds_pubrange{id = {<<>>, 12}, until = 13, type = inflight},
#range{first = 13, until = 20, type = inflight}, #ds_pubrange{id = {<<>>, 13}, until = 20, type = inflight},
#range{first = 20, until = 42, type = inflight} #ds_pubrange{id = {<<>>, 20}, until = 42, type = inflight}
]) ])
), ),
?_assertEqual( ?_assertEqual(
{13, 13}, {13, 13},
compute_inflight_range([ compute_inflight_range([
#range{first = 1, until = 2, type = checkpoint}, #ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint},
#range{first = 4, until = 8, type = checkpoint}, #ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint},
#range{first = 11, until = 12, type = checkpoint}, #ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint},
#range{first = 12, until = 13, type = checkpoint} #ds_pubrange{id = {<<>>, 12}, until = 13, type = checkpoint}
]) ])
) )
]. ].

View File

@ -43,12 +43,25 @@
-record(ds_pubrange, { -record(ds_pubrange, {
id :: { id :: {
%% What session this range belongs to.
_Session :: emqx_persistent_session_ds:id(), _Session :: emqx_persistent_session_ds:id(),
%% Where this range starts.
_First :: emqx_persistent_message_ds_replayer:seqno() _First :: emqx_persistent_message_ds_replayer:seqno()
}, },
%% Where this range ends: the first seqno that is not included in the range.
until :: emqx_persistent_message_ds_replayer:seqno(), until :: emqx_persistent_message_ds_replayer:seqno(),
%% Which stream this range is over.
stream :: _StreamRef, stream :: _StreamRef,
%% Type of a range:
%% * Inflight range is a range of yet unacked messages from this stream.
%% * Checkpoint range was already acked, its purpose is to keep track of the
%% very last iterator for this stream.
type :: inflight | checkpoint, type :: inflight | checkpoint,
%% Meaning of this depends on the type of the range:
%% * For inflight range, this is the iterator pointing to the first message in
%% the range.
%% * For checkpoint range, this is the iterator pointing right past the last
%% message in the range.
iterator :: emqx_ds:iterator() iterator :: emqx_ds:iterator()
}). }).
-type ds_pubrange() :: #ds_pubrange{}. -type ds_pubrange() :: #ds_pubrange{}.