feat(sessds): use integer tags for pubrange types

This commit is contained in:
Andrew Mayorov 2023-11-27 18:27:15 +03:00
parent bb05281adb
commit 86685bdce2
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 26 additions and 23 deletions

View File

@ -107,9 +107,9 @@ n_inflight(#inflight{offset_ranges = Ranges}) ->
%% actual `AckedUntil` / `CompUntil` during `commit_offset/4`.
lists:foldl(
fun
(#ds_pubrange{type = checkpoint}, N) ->
(#ds_pubrange{type = ?T_CHECKPOINT}, N) ->
N;
(#ds_pubrange{type = inflight, id = {_, First}, until = Until}, N) ->
(#ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until}, N) ->
N + range_size(First, Until)
end,
0,
@ -223,9 +223,9 @@ find_committed_until(Track, Ranges) ->
RangesUncommitted = lists:dropwhile(
fun(Range) ->
case Range of
#ds_pubrange{type = checkpoint} ->
#ds_pubrange{type = ?T_CHECKPOINT} ->
true;
#ds_pubrange{type = inflight, tracks = Tracks} ->
#ds_pubrange{type = ?T_INFLIGHT, tracks = Tracks} ->
not has_track(Track, Tracks)
end
end,
@ -261,7 +261,7 @@ fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -
Size = range_size(FirstSeqno, UntilSeqno),
Range0 = #ds_pubrange{
id = {SessionId, FirstSeqno},
type = inflight,
type = ?T_INFLIGHT,
tracks = Tracks,
until = UntilSeqno,
stream = DSStream#ds_stream.ref,
@ -342,7 +342,7 @@ discard_committed_ranges(
discard_committed_ranges(_SessionId, _Commits, _Checkpoints, []) ->
[].
discard_committed_range(_Commits, #ds_pubrange{type = checkpoint}) ->
discard_committed_range(_Commits, #ds_pubrange{type = ?T_CHECKPOINT}) ->
discard;
discard_committed_range(
#{ack := AckedUntil, comp := CompUntil},
@ -374,7 +374,7 @@ discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) ->
replay_range(
ReplyFun,
Range0 = #ds_pubrange{type = inflight, id = {_, First}, until = Until, iterator = It},
Range0 = #ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until, iterator = It},
Acc
) ->
Size = range_size(First, Until),
@ -385,7 +385,7 @@ replay_range(
%% range, so that we can pick up where we left off.
Range = keep_next_iterator(ItNext, Range0),
{Range, Replies ++ Acc};
replay_range(_ReplyFun, Range0 = #ds_pubrange{type = checkpoint}, Acc) ->
replay_range(_ReplyFun, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) ->
{Range0, Acc}.
validate_commit(
@ -456,7 +456,7 @@ restore_first_iterator(Range = #ds_pubrange{misc = Misc = #{iterator_first := It
}.
-spec preserve_range(ds_pubrange()) -> ok.
preserve_range(Range = #ds_pubrange{type = inflight}) ->
preserve_range(Range = #ds_pubrange{type = ?T_INFLIGHT}) ->
mria:dirty_write(?SESSION_PUBRANGE_TAB, Range).
has_track(ack, Tracks) ->
@ -469,11 +469,11 @@ discard_range(#ds_pubrange{id = RangeId}) ->
mria:dirty_delete(?SESSION_PUBRANGE_TAB, RangeId).
-spec checkpoint_range(ds_pubrange()) -> ds_pubrange().
checkpoint_range(Range0 = #ds_pubrange{type = inflight}) ->
Range = Range0#ds_pubrange{type = checkpoint, misc = #{}},
checkpoint_range(Range0 = #ds_pubrange{type = ?T_INFLIGHT}) ->
Range = Range0#ds_pubrange{type = ?T_CHECKPOINT, misc = #{}},
ok = mria:dirty_write(?SESSION_PUBRANGE_TAB, Range),
Range;
checkpoint_range(Range = #ds_pubrange{type = checkpoint}) ->
checkpoint_range(Range = #ds_pubrange{type = ?T_CHECKPOINT}) ->
%% This range should have been checkpointed already.
Range.
@ -614,25 +614,25 @@ compute_inflight_range_test_() ->
?_assertEqual(
{#{ack => 12, comp => 13}, 42},
compute_inflight_range([
#ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint},
#ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint},
#ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint},
#ds_pubrange{id = {<<>>, 1}, until = 2, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 4}, until = 8, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 11}, until = 12, type = ?T_CHECKPOINT},
#ds_pubrange{
id = {<<>>, 12},
until = 13,
type = inflight,
type = ?T_INFLIGHT,
tracks = ?TRACK_FLAG(?ACK)
},
#ds_pubrange{
id = {<<>>, 13},
until = 20,
type = inflight,
type = ?T_INFLIGHT,
tracks = ?TRACK_FLAG(?COMP)
},
#ds_pubrange{
id = {<<>>, 20},
until = 42,
type = inflight,
type = ?T_INFLIGHT,
tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)
}
])
@ -640,10 +640,10 @@ compute_inflight_range_test_() ->
?_assertEqual(
{#{ack => 13, comp => 13}, 13},
compute_inflight_range([
#ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint},
#ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint},
#ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint},
#ds_pubrange{id = {<<>>, 12}, until = 13, type = checkpoint}
#ds_pubrange{id = {<<>>, 1}, until = 2, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 4}, until = 8, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 11}, until = 12, type = ?T_CHECKPOINT},
#ds_pubrange{id = {<<>>, 12}, until = 13, type = ?T_CHECKPOINT}
])
)
].

View File

@ -25,6 +25,9 @@
-define(SESSION_MARKER_TAB, emqx_ds_marker_tab).
-define(DS_MRIA_SHARD, emqx_ds_session_shard).
-define(T_INFLIGHT, 1).
-define(T_CHECKPOINT, 2).
-record(ds_sub, {
id :: emqx_persistent_session_ds:subscription_id(),
start_time :: emqx_ds:time(),
@ -57,7 +60,7 @@
%% * Inflight range is a range of yet unacked messages from this stream.
%% * Checkpoint range was already acked, its purpose is to keep track of the
%% very last iterator for this stream.
type :: inflight | checkpoint,
type :: ?T_INFLIGHT | ?T_CHECKPOINT,
%% What commit tracks this range is part of.
%% This is rarely stored: we only need to persist it when the range contains
%% QoS 2 messages.