refactor(sessds): add dedicated `#ds_pubrange.tracks` field

This slightly simplifies the replayer code.
This commit is contained in:
Andrew Mayorov 2023-11-27 17:50:14 +03:00
parent 923898eadf
commit bb05281adb
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 21 additions and 45 deletions

View File

@ -66,7 +66,10 @@
-opaque inflight() :: #inflight{}.
-type reply_fun() :: fun((seqno(), emqx_types:message()) -> emqx_session:reply()).
-type reply_fun() :: fun(
(seqno(), emqx_types:message()) ->
emqx_session:replies() | {_AdvanceSeqno :: false, emqx_session:replies()}
).
%%================================================================================
%% API funcions
@ -222,8 +225,8 @@ find_committed_until(Track, Ranges) ->
case Range of
#ds_pubrange{type = checkpoint} ->
true;
#ds_pubrange{type = inflight} = Range ->
not has_range_track(Track, Range)
#ds_pubrange{type = inflight, tracks = Tracks} ->
not has_track(Track, Tracks)
end
end,
Ranges
@ -259,16 +262,16 @@ fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -
Range0 = #ds_pubrange{
id = {SessionId, FirstSeqno},
type = inflight,
tracks = Tracks,
until = UntilSeqno,
stream = DSStream#ds_stream.ref,
iterator = ItBegin
},
Range1 = update_range_tracks(Tracks, Range0),
ok = preserve_range(Range1),
ok = preserve_range(Range0),
%% ...Yet we need to keep the iterator pointing past the end of the
%% range, so that we can pick up where we left off: it will become
%% `ItBegin` of the next range for this stream.
Range = keep_next_iterator(ItEnd, Range1),
Range = keep_next_iterator(ItEnd, Range0),
Inflight = Inflight0#inflight{
next_seqno = UntilSeqno,
offset_ranges = Ranges ++ [Range]
@ -332,7 +335,7 @@ discard_committed_ranges(
TracksLeft ->
%% Only some track has been committed.
%% Preserve the uncommitted tracks in the database.
RangeKept = update_range_tracks(TracksLeft, Range),
RangeKept = Range#ds_pubrange{tracks = TracksLeft},
preserve_range(restore_first_iterator(RangeKept)),
[RangeKept | discard_committed_ranges(SessionId, Commits, Checkpoints, Rest)]
end;
@ -346,11 +349,7 @@ discard_committed_range(
#ds_pubrange{until = Until}
) when Until > AckedUntil andalso Until > CompUntil ->
keep_all;
discard_committed_range(
Commits,
Range = #ds_pubrange{until = Until}
) ->
Tracks = get_range_tracks(Range),
discard_committed_range(Commits, #ds_pubrange{until = Until, tracks = Tracks}) ->
case discard_tracks(Commits, Until, Tracks) of
0 ->
discard;
@ -381,10 +380,10 @@ replay_range(
Size = range_size(First, Until),
{ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size),
%% Asserting that range is consistent with the message storage state.
{Replies, {Until, Tracks}} = publish(ReplyFun, First, MessagesUnacked),
{Replies, {Until, _TracksInitial}} = publish(ReplyFun, First, MessagesUnacked),
%% Again, we need to keep the iterator pointing past the end of the
%% range, so that we can pick up where we left off.
Range = keep_next_iterator(ItNext, ensure_range_tracks(Tracks, Range0)),
Range = keep_next_iterator(ItNext, Range0),
{Range, Replies ++ Acc};
replay_range(_ReplyFun, Range0 = #ds_pubrange{type = checkpoint}, Acc) ->
{Range0, Acc}.
@ -456,28 +455,10 @@ restore_first_iterator(Range = #ds_pubrange{misc = Misc = #{iterator_first := It
misc = maps:remove(iterator_first, Misc)
}.
ensure_range_tracks(_Tracks, Range = #ds_pubrange{misc = #{?T_tracks := _Existing}}) ->
Range;
ensure_range_tracks(Tracks, Range = #ds_pubrange{}) ->
update_range_tracks(Tracks, Range).
update_range_tracks(?TRACK_FLAG(?ACK), Range = #ds_pubrange{misc = Misc}) ->
%% This is assumed as the default value for the tracks field.
Range#ds_pubrange{misc = maps:remove(?T_tracks, Misc)};
update_range_tracks(Tracks, Range = #ds_pubrange{misc = Misc}) ->
Range#ds_pubrange{misc = Misc#{?T_tracks => Tracks}}.
get_range_tracks(#ds_pubrange{misc = Misc}) ->
%% This is assumed as the default value for the tracks field.
maps:get(?T_tracks, Misc, ?TRACK_FLAG(?ACK)).
-spec preserve_range(ds_pubrange()) -> ok.
preserve_range(Range = #ds_pubrange{type = inflight}) ->
mria:dirty_write(?SESSION_PUBRANGE_TAB, Range).
has_range_track(Track, Range) ->
has_track(Track, get_range_tracks(Range)).
has_track(ack, Tracks) ->
(?TRACK_FLAG(?ACK) band Tracks) > 0;
has_track(comp, Tracks) ->
@ -640,19 +621,19 @@ compute_inflight_range_test_() ->
id = {<<>>, 12},
until = 13,
type = inflight,
misc = #{}
tracks = ?TRACK_FLAG(?ACK)
},
#ds_pubrange{
id = {<<>>, 13},
until = 20,
type = inflight,
misc = #{?T_tracks => ?TRACK_FLAG(?COMP)}
tracks = ?TRACK_FLAG(?COMP)
},
#ds_pubrange{
id = {<<>>, 20},
until = 42,
type = inflight,
misc = #{?T_tracks => ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)}
tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)
}
])
),

View File

@ -25,9 +25,6 @@
-define(SESSION_MARKER_TAB, emqx_ds_marker_tab).
-define(DS_MRIA_SHARD, emqx_ds_session_shard).
%% Integer tags for `misc` maps keys.
-define(T_tracks, 1).
-record(ds_sub, {
id :: emqx_persistent_session_ds:subscription_id(),
start_time :: emqx_ds:time(),
@ -61,6 +58,10 @@
%% * Checkpoint range was already acked, its purpose is to keep track of the
%% very last iterator for this stream.
type :: inflight | checkpoint,
%% What commit tracks this range is part of.
%% This is rarely stored: we only need to persist it when the range contains
%% QoS 2 messages.
tracks = 0 :: non_neg_integer(),
%% Meaning of this depends on the type of the range:
%% * For inflight range, this is the iterator pointing to the first message in
%% the range.
@ -68,13 +69,7 @@
%% message in the range.
iterator :: emqx_ds:iterator(),
%% Reserved for future use.
misc = #{} :: #{
%% What commit tracks this range is part of.
%% This is rarely stored: we only need to persist it when the range
%% contains QoS 2 messages.
?T_tracks => non_neg_integer(),
_ => _
}
misc = #{} :: map()
}).
-type ds_pubrange() :: #ds_pubrange{}.