diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index d1e60f0ae..12b7c68a2 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -66,7 +66,10 @@ -opaque inflight() :: #inflight{}. --type reply_fun() :: fun((seqno(), emqx_types:message()) -> emqx_session:reply()). +-type reply_fun() :: fun( + (seqno(), emqx_types:message()) -> + emqx_session:replies() | {_AdvanceSeqno :: false, emqx_session:replies()} +). %%================================================================================ %% API funcions @@ -222,8 +225,8 @@ find_committed_until(Track, Ranges) -> case Range of #ds_pubrange{type = checkpoint} -> true; - #ds_pubrange{type = inflight} = Range -> - not has_range_track(Track, Range) + #ds_pubrange{type = inflight, tracks = Tracks} -> + not has_track(Track, Tracks) end end, Ranges @@ -259,16 +262,16 @@ fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 - Range0 = #ds_pubrange{ id = {SessionId, FirstSeqno}, type = inflight, + tracks = Tracks, until = UntilSeqno, stream = DSStream#ds_stream.ref, iterator = ItBegin }, - Range1 = update_range_tracks(Tracks, Range0), - ok = preserve_range(Range1), + ok = preserve_range(Range0), %% ...Yet we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off: it will become %% `ItBegin` of the next range for this stream. - Range = keep_next_iterator(ItEnd, Range1), + Range = keep_next_iterator(ItEnd, Range0), Inflight = Inflight0#inflight{ next_seqno = UntilSeqno, offset_ranges = Ranges ++ [Range] @@ -332,7 +335,7 @@ discard_committed_ranges( TracksLeft -> %% Only some track has been committed. %% Preserve the uncommitted tracks in the database. - RangeKept = update_range_tracks(TracksLeft, Range), + RangeKept = Range#ds_pubrange{tracks = TracksLeft}, preserve_range(restore_first_iterator(RangeKept)), [RangeKept | discard_committed_ranges(SessionId, Commits, Checkpoints, Rest)] end; @@ -346,11 +349,7 @@ discard_committed_range( #ds_pubrange{until = Until} ) when Until > AckedUntil andalso Until > CompUntil -> keep_all; -discard_committed_range( - Commits, - Range = #ds_pubrange{until = Until} -) -> - Tracks = get_range_tracks(Range), +discard_committed_range(Commits, #ds_pubrange{until = Until, tracks = Tracks}) -> case discard_tracks(Commits, Until, Tracks) of 0 -> discard; @@ -381,10 +380,10 @@ replay_range( Size = range_size(First, Until), {ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size), %% Asserting that range is consistent with the message storage state. - {Replies, {Until, Tracks}} = publish(ReplyFun, First, MessagesUnacked), + {Replies, {Until, _TracksInitial}} = publish(ReplyFun, First, MessagesUnacked), %% Again, we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off. - Range = keep_next_iterator(ItNext, ensure_range_tracks(Tracks, Range0)), + Range = keep_next_iterator(ItNext, Range0), {Range, Replies ++ Acc}; replay_range(_ReplyFun, Range0 = #ds_pubrange{type = checkpoint}, Acc) -> {Range0, Acc}. @@ -456,28 +455,10 @@ restore_first_iterator(Range = #ds_pubrange{misc = Misc = #{iterator_first := It misc = maps:remove(iterator_first, Misc) }. -ensure_range_tracks(_Tracks, Range = #ds_pubrange{misc = #{?T_tracks := _Existing}}) -> - Range; -ensure_range_tracks(Tracks, Range = #ds_pubrange{}) -> - update_range_tracks(Tracks, Range). - -update_range_tracks(?TRACK_FLAG(?ACK), Range = #ds_pubrange{misc = Misc}) -> - %% This is assumed as the default value for the tracks field. - Range#ds_pubrange{misc = maps:remove(?T_tracks, Misc)}; -update_range_tracks(Tracks, Range = #ds_pubrange{misc = Misc}) -> - Range#ds_pubrange{misc = Misc#{?T_tracks => Tracks}}. - -get_range_tracks(#ds_pubrange{misc = Misc}) -> - %% This is assumed as the default value for the tracks field. - maps:get(?T_tracks, Misc, ?TRACK_FLAG(?ACK)). - -spec preserve_range(ds_pubrange()) -> ok. preserve_range(Range = #ds_pubrange{type = inflight}) -> mria:dirty_write(?SESSION_PUBRANGE_TAB, Range). -has_range_track(Track, Range) -> - has_track(Track, get_range_tracks(Range)). - has_track(ack, Tracks) -> (?TRACK_FLAG(?ACK) band Tracks) > 0; has_track(comp, Tracks) -> @@ -640,19 +621,19 @@ compute_inflight_range_test_() -> id = {<<>>, 12}, until = 13, type = inflight, - misc = #{} + tracks = ?TRACK_FLAG(?ACK) }, #ds_pubrange{ id = {<<>>, 13}, until = 20, type = inflight, - misc = #{?T_tracks => ?TRACK_FLAG(?COMP)} + tracks = ?TRACK_FLAG(?COMP) }, #ds_pubrange{ id = {<<>>, 20}, until = 42, type = inflight, - misc = #{?T_tracks => ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)} + tracks = ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP) } ]) ), diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 24c14f7eb..779f56736 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -25,9 +25,6 @@ -define(SESSION_MARKER_TAB, emqx_ds_marker_tab). -define(DS_MRIA_SHARD, emqx_ds_session_shard). -%% Integer tags for `misc` maps keys. --define(T_tracks, 1). - -record(ds_sub, { id :: emqx_persistent_session_ds:subscription_id(), start_time :: emqx_ds:time(), @@ -61,6 +58,10 @@ %% * Checkpoint range was already acked, its purpose is to keep track of the %% very last iterator for this stream. type :: inflight | checkpoint, + %% What commit tracks this range is part of. + %% This is rarely stored: we only need to persist it when the range contains + %% QoS 2 messages. + tracks = 0 :: non_neg_integer(), %% Meaning of this depends on the type of the range: %% * For inflight range, this is the iterator pointing to the first message in %% the range. @@ -68,13 +69,7 @@ %% message in the range. iterator :: emqx_ds:iterator(), %% Reserved for future use. - misc = #{} :: #{ - %% What commit tracks this range is part of. - %% This is rarely stored: we only need to persist it when the range - %% contains QoS 2 messages. - ?T_tracks => non_neg_integer(), - _ => _ - } + misc = #{} :: map() }). -type ds_pubrange() :: #ds_pubrange{}.