From 24710d0f56a1e92a00c8d0872153f3823b35b16f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 8 Dec 2023 11:10:59 +0100 Subject: [PATCH] fix(sessds): avoid accumulating QoS0-only ranges in memory --- .../emqx_persistent_message_ds_replayer.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 723f02a01..ba2d24bfc 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -186,7 +186,9 @@ poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSi true -> %% TODO: Wrap this in `mria:async_dirty/2`? Streams = shuffle(get_streams(SessionId)), - fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, []) + {Publihes, Inflight} = fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, []), + %% Discard now irrelevant QoS0-only ranges, if any. + {Publihes, discard_committed(SessionId, Inflight)} end. %% Which seqno this track is committed until. @@ -300,9 +302,9 @@ discard_committed( find_checkpoints(Ranges) -> lists:foldl( - fun(#ds_pubrange{stream = StreamRef, until = Until}, Acc) -> + fun(#ds_pubrange{stream = StreamRef} = Range, Acc) -> %% For each stream, remember the last range over this stream. - Acc#{StreamRef => Until} + Acc#{StreamRef => Range} end, #{}, Ranges @@ -312,7 +314,7 @@ discard_committed_ranges( SessionId, Commits, Checkpoints, - Ranges = [Range = #ds_pubrange{until = Until, stream = StreamRef} | Rest] + Ranges = [Range = #ds_pubrange{stream = StreamRef} | Rest] ) -> case discard_committed_range(Commits, Range) of discard -> @@ -321,11 +323,11 @@ discard_committed_ranges( %% over this stream (i.e. a checkpoint). RangeKept = case maps:get(StreamRef, Checkpoints) of - CP when CP > Until -> + Range -> + [checkpoint_range(Range)]; + _Previous -> discard_range(Range), - []; - Until -> - [checkpoint_range(Range)] + [] end, %% Since we're (intentionally) not using transactions here, it's important to %% issue database writes in the same order in which ranges are stored: from