From 86685bdce27dbe919c093bcb3ffa1ca64291ef08 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 27 Nov 2023 18:27:15 +0300 Subject: [PATCH] feat(sessds): use integer tags for pubrange types --- .../emqx_persistent_message_ds_replayer.erl | 44 +++++++++---------- apps/emqx/src/emqx_persistent_session_ds.hrl | 5 ++- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 12b7c68a2..865459150 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -107,9 +107,9 @@ n_inflight(#inflight{offset_ranges = Ranges}) -> %% actual `AckedUntil` / `CompUntil` during `commit_offset/4`. lists:foldl( fun - (#ds_pubrange{type = checkpoint}, N) -> + (#ds_pubrange{type = ?T_CHECKPOINT}, N) -> N; - (#ds_pubrange{type = inflight, id = {_, First}, until = Until}, N) -> + (#ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until}, N) -> N + range_size(First, Until) end, 0, @@ -223,9 +223,9 @@ find_committed_until(Track, Ranges) -> RangesUncommitted = lists:dropwhile( fun(Range) -> case Range of - #ds_pubrange{type = checkpoint} -> + #ds_pubrange{type = ?T_CHECKPOINT} -> true; - #ds_pubrange{type = inflight, tracks = Tracks} -> + #ds_pubrange{type = ?T_INFLIGHT, tracks = Tracks} -> not has_track(Track, Tracks) end end, @@ -261,7 +261,7 @@ fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 - Size = range_size(FirstSeqno, UntilSeqno), Range0 = #ds_pubrange{ id = {SessionId, FirstSeqno}, - type = inflight, + type = ?T_INFLIGHT, tracks = Tracks, until = UntilSeqno, stream = DSStream#ds_stream.ref, @@ -342,7 +342,7 @@ discard_committed_ranges( discard_committed_ranges(_SessionId, _Commits, _Checkpoints, []) -> []. -discard_committed_range(_Commits, #ds_pubrange{type = checkpoint}) -> +discard_committed_range(_Commits, #ds_pubrange{type = ?T_CHECKPOINT}) -> discard; discard_committed_range( #{ack := AckedUntil, comp := CompUntil}, @@ -374,7 +374,7 @@ discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) -> replay_range( ReplyFun, - Range0 = #ds_pubrange{type = inflight, id = {_, First}, until = Until, iterator = It}, + Range0 = #ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until, iterator = It}, Acc ) -> Size = range_size(First, Until), @@ -385,7 +385,7 @@ replay_range( %% range, so that we can pick up where we left off. Range = keep_next_iterator(ItNext, Range0), {Range, Replies ++ Acc}; -replay_range(_ReplyFun, Range0 = #ds_pubrange{type = checkpoint}, Acc) -> +replay_range(_ReplyFun, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) -> {Range0, Acc}. validate_commit( @@ -456,7 +456,7 @@ restore_first_iterator(Range = #ds_pubrange{misc = Misc = #{iterator_first := It }. -spec preserve_range(ds_pubrange()) -> ok. -preserve_range(Range = #ds_pubrange{type = inflight}) -> +preserve_range(Range = #ds_pubrange{type = ?T_INFLIGHT}) -> mria:dirty_write(?SESSION_PUBRANGE_TAB, Range). has_track(ack, Tracks) -> @@ -469,11 +469,11 @@ discard_range(#ds_pubrange{id = RangeId}) -> mria:dirty_delete(?SESSION_PUBRANGE_TAB, RangeId). -spec checkpoint_range(ds_pubrange()) -> ds_pubrange(). -checkpoint_range(Range0 = #ds_pubrange{type = inflight}) -> - Range = Range0#ds_pubrange{type = checkpoint, misc = #{}}, +checkpoint_range(Range0 = #ds_pubrange{type = ?T_INFLIGHT}) -> + Range = Range0#ds_pubrange{type = ?T_CHECKPOINT, misc = #{}}, ok = mria:dirty_write(?SESSION_PUBRANGE_TAB, Range), Range; -checkpoint_range(Range = #ds_pubrange{type = checkpoint}) -> +checkpoint_range(Range = #ds_pubrange{type = ?T_CHECKPOINT}) -> %% This range should have been checkpointed already. Range. @@ -614,25 +614,25 @@ compute_inflight_range_test_() -> ?_assertEqual( {#{ack => 12, comp => 13}, 42}, compute_inflight_range([ - #ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint}, - #ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint}, - #ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint}, + #ds_pubrange{id = {<<>>, 1}, until = 2, type = ?T_CHECKPOINT}, + #ds_pubrange{id = {<<>>, 4}, until = 8, type = ?T_CHECKPOINT}, + #ds_pubrange{id = {<<>>, 11}, until = 12, type = ?T_CHECKPOINT}, #ds_pubrange{ id = {<<>>, 12}, until = 13, - type = inflight, + type = ?T_INFLIGHT, tracks = ?TRACK_FLAG(?ACK) }, #ds_pubrange{ id = {<<>>, 13}, until = 20, - type = inflight, + type = ?T_INFLIGHT, tracks = ?TRACK_FLAG(?COMP) }, #ds_pubrange{ id = {<<>>, 20}, until = 42, - type = inflight, + type = ?T_INFLIGHT, tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP) } ]) @@ -640,10 +640,10 @@ compute_inflight_range_test_() -> ?_assertEqual( {#{ack => 13, comp => 13}, 13}, compute_inflight_range([ - #ds_pubrange{id = {<<>>, 1}, until = 2, type = checkpoint}, - #ds_pubrange{id = {<<>>, 4}, until = 8, type = checkpoint}, - #ds_pubrange{id = {<<>>, 11}, until = 12, type = checkpoint}, - #ds_pubrange{id = {<<>>, 12}, until = 13, type = checkpoint} + #ds_pubrange{id = {<<>>, 1}, until = 2, type = ?T_CHECKPOINT}, + #ds_pubrange{id = {<<>>, 4}, until = 8, type = ?T_CHECKPOINT}, + #ds_pubrange{id = {<<>>, 11}, until = 12, type = ?T_CHECKPOINT}, + #ds_pubrange{id = {<<>>, 12}, until = 13, type = ?T_CHECKPOINT} ]) ) ]. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 779f56736..73ff609b5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -25,6 +25,9 @@ -define(SESSION_MARKER_TAB, emqx_ds_marker_tab). -define(DS_MRIA_SHARD, emqx_ds_session_shard). +-define(T_INFLIGHT, 1). +-define(T_CHECKPOINT, 2). + -record(ds_sub, { id :: emqx_persistent_session_ds:subscription_id(), start_time :: emqx_ds:time(), @@ -57,7 +60,7 @@ %% * Inflight range is a range of yet unacked messages from this stream. %% * Checkpoint range was already acked, its purpose is to keep track of the %% very last iterator for this stream. - type :: inflight | checkpoint, + type :: ?T_INFLIGHT | ?T_CHECKPOINT, %% What commit tracks this range is part of. %% This is rarely stored: we only need to persist it when the range contains %% QoS 2 messages.