fix(sessds): avoid accumulating QoS0-only ranges in memory
This commit is contained in:
parent
5384b9ea2f
commit
24710d0f56
|
@ -186,7 +186,9 @@ poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSi
|
||||||
true ->
|
true ->
|
||||||
%% TODO: Wrap this in `mria:async_dirty/2`?
|
%% TODO: Wrap this in `mria:async_dirty/2`?
|
||||||
Streams = shuffle(get_streams(SessionId)),
|
Streams = shuffle(get_streams(SessionId)),
|
||||||
fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, [])
|
{Publihes, Inflight} = fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, []),
|
||||||
|
%% Discard now irrelevant QoS0-only ranges, if any.
|
||||||
|
{Publihes, discard_committed(SessionId, Inflight)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Which seqno this track is committed until.
|
%% Which seqno this track is committed until.
|
||||||
|
@ -300,9 +302,9 @@ discard_committed(
|
||||||
|
|
||||||
find_checkpoints(Ranges) ->
|
find_checkpoints(Ranges) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#ds_pubrange{stream = StreamRef, until = Until}, Acc) ->
|
fun(#ds_pubrange{stream = StreamRef} = Range, Acc) ->
|
||||||
%% For each stream, remember the last range over this stream.
|
%% For each stream, remember the last range over this stream.
|
||||||
Acc#{StreamRef => Until}
|
Acc#{StreamRef => Range}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
Ranges
|
Ranges
|
||||||
|
@ -312,7 +314,7 @@ discard_committed_ranges(
|
||||||
SessionId,
|
SessionId,
|
||||||
Commits,
|
Commits,
|
||||||
Checkpoints,
|
Checkpoints,
|
||||||
Ranges = [Range = #ds_pubrange{until = Until, stream = StreamRef} | Rest]
|
Ranges = [Range = #ds_pubrange{stream = StreamRef} | Rest]
|
||||||
) ->
|
) ->
|
||||||
case discard_committed_range(Commits, Range) of
|
case discard_committed_range(Commits, Range) of
|
||||||
discard ->
|
discard ->
|
||||||
|
@ -321,11 +323,11 @@ discard_committed_ranges(
|
||||||
%% over this stream (i.e. a checkpoint).
|
%% over this stream (i.e. a checkpoint).
|
||||||
RangeKept =
|
RangeKept =
|
||||||
case maps:get(StreamRef, Checkpoints) of
|
case maps:get(StreamRef, Checkpoints) of
|
||||||
CP when CP > Until ->
|
Range ->
|
||||||
|
[checkpoint_range(Range)];
|
||||||
|
_Previous ->
|
||||||
discard_range(Range),
|
discard_range(Range),
|
||||||
[];
|
[]
|
||||||
Until ->
|
|
||||||
[checkpoint_range(Range)]
|
|
||||||
end,
|
end,
|
||||||
%% Since we're (intentionally) not using transactions here, it's important to
|
%% Since we're (intentionally) not using transactions here, it's important to
|
||||||
%% issue database writes in the same order in which ranges are stored: from
|
%% issue database writes in the same order in which ranges are stored: from
|
||||||
|
|
Loading…
Reference in New Issue