Bound decoded forward fill per chunk in streaming read loop
The inter-segment back-pressure gate matched WAV byte density but let a 4MB Opus segment (~100s at 320kbps) decode eagerly into main-process RAM, OOMing the tab with HW accel off. Drain per chunk past high-water, gated on playback start. Adds load-generation diagnostics for the double-load hypothesis.
This commit is contained in:
@@ -53,6 +53,14 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
private readonly ILogger<StreamingAudioPlayerService> _logger;
|
private readonly ILogger<StreamingAudioPlayerService> _logger;
|
||||||
private string? _currentTrackId;
|
private string? _currentTrackId;
|
||||||
|
|
||||||
|
// Monotonic load-generation counter (diagnostic). Incremented on every LoadTrackStreaming entry and
|
||||||
|
// stamped into the load's logs so two loads for ONE user play action — the "Duration set from header
|
||||||
|
// logged twice" double-load hypothesis that needs in-browser confirmation — are unmistakable: a
|
||||||
|
// single play should show exactly one "Streaming load #N started"/"finished" pair. If two overlapping
|
||||||
|
// starts appear for one click, the generation ids pin the re-entrancy. Cheap (an int per load) and
|
||||||
|
// never gates behavior.
|
||||||
|
private int _loadGeneration;
|
||||||
|
|
||||||
// The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and
|
// The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and
|
||||||
// reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the
|
// reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the
|
||||||
// initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup
|
// initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup
|
||||||
@@ -164,6 +172,11 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
// the single-instance JS StreamDecoder.
|
// the single-instance JS StreamDecoder.
|
||||||
await ResetToIdle();
|
await ResetToIdle();
|
||||||
|
|
||||||
|
// Stamp this load with a fresh generation id (diagnostic — see _loadGeneration). Logged at
|
||||||
|
// start and finish so a double-load shows as two overlapping start/finish pairs for one play.
|
||||||
|
var loadGeneration = ++_loadGeneration;
|
||||||
|
_logger.LogInformation("Streaming load #{Gen} started for track {TrackId}", loadGeneration, track.EntryKey);
|
||||||
|
|
||||||
// Save track ID for seek operations
|
// Save track ID for seek operations
|
||||||
_currentTrackId = track.EntryKey;
|
_currentTrackId = track.EntryKey;
|
||||||
// A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the
|
// A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the
|
||||||
@@ -315,6 +328,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
IsLoading = false;
|
IsLoading = false;
|
||||||
|
_logger.LogInformation("Streaming load #{Gen} finished for track {TrackId} (superseded={Superseded})",
|
||||||
|
loadGeneration, track.EntryKey, !ReferenceEquals(_streamingCancellation, loadCts));
|
||||||
// Only notify if this load is still the active operation. A superseding seek
|
// Only notify if this load is still the active operation. A superseding seek
|
||||||
// owns state notifications; firing here mid-seek would push a stale snapshot.
|
// owns state notifications; firing here mid-seek would push a stale snapshot.
|
||||||
if (ReferenceEquals(_streamingCancellation, loadCts))
|
if (ReferenceEquals(_streamingCancellation, loadCts))
|
||||||
@@ -519,8 +534,6 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
CanStartStreaming = chunkResult.CanStartStreaming;
|
CanStartStreaming = chunkResult.CanStartStreaming;
|
||||||
HeaderParsed = chunkResult.HeaderParsed;
|
HeaderParsed = chunkResult.HeaderParsed;
|
||||||
BufferedChunks = chunkResult.BufferCount;
|
BufferedChunks = chunkResult.BufferCount;
|
||||||
// chunkResult.ProductionPaused is informational only on this path — back-pressure
|
|
||||||
// granularity is one segment (the inter-segment gate below), not per-chunk.
|
|
||||||
|
|
||||||
// Set duration from header when available (only set once)
|
// Set duration from header when available (only set once)
|
||||||
if (chunkResult.Duration.HasValue && Duration == null)
|
if (chunkResult.Duration.HasValue && Duration == null)
|
||||||
@@ -576,6 +589,29 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
}
|
}
|
||||||
|
|
||||||
await ThrottledNotifyStateChanged();
|
await ThrottledNotifyStateChanged();
|
||||||
|
|
||||||
|
// Per-chunk back-pressure — the bound that actually holds for high-density codecs.
|
||||||
|
// The inter-segment gate alone is matched to WAV's byte density (~24 s of audio per
|
||||||
|
// 4 MB segment) but NOT to Opus: at 320 kbps a 4 MB segment is ~100 s of decodable
|
||||||
|
// audio. The inner loop has the whole segment's bytes already in hand, so with no
|
||||||
|
// network wait to pace it, it would decode the ENTIRE segment eagerly — piling tens
|
||||||
|
// of MB of decoded f32 PCM AHEAD of a playhead that has barely moved, before the
|
||||||
|
// inter-segment gate ever runs. With HW accel off that lookahead lives in main-
|
||||||
|
// process RAM, and the byte ceiling cannot save us because nothing on this path
|
||||||
|
// polls it. So drain to low-water per chunk once the scheduler is over high-water.
|
||||||
|
//
|
||||||
|
// Gated on _streamingPlaybackStarted so this can NEVER block first audio (C2): until
|
||||||
|
// playback starts the playhead does not advance, so the forward fill would never
|
||||||
|
// drain and the loop would deadlock. The 30 s high-water sits far above the
|
||||||
|
// 6-buffer playback-start minimum, so in practice the gate is not even reached
|
||||||
|
// before playback begins — the guard is the correctness backstop, not the common
|
||||||
|
// case. Reads the piggybacked flag (no extra interop hop) to DECIDE to drain; the
|
||||||
|
// drain helper then polls IsProductionPaused — the same steady-state-reads-flag /
|
||||||
|
// throttled-state-polls split the inter-segment gate uses.
|
||||||
|
if (_streamingPlaybackStarted && chunkResult.ProductionPaused)
|
||||||
|
{
|
||||||
|
await DrainBackpressureAsync(cancellationToken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} while (currentBytes > 0);
|
} while (currentBytes > 0);
|
||||||
|
|
||||||
@@ -608,19 +644,16 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
$"but expected up to {SegmentSizeBytes} and have not reached EOF");
|
$"but expected up to {SegmentSizeBytes} and have not reached EOF");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH
|
// Inter-segment back-pressure gate (Phase 21.2 fill signal, gating SEGMENT FETCH). Do not
|
||||||
// instead of pacing ReadAsync on an open stream). Do not fetch the next segment while
|
// fetch the next segment while the scheduler is over high-water; wait until it drains
|
||||||
// the scheduler is over high-water; wait until it drains below low-water. Because the
|
// below low-water. Because the browser only buffers bounded segments and we hold off
|
||||||
// browser only buffers bounded segments and we hold off requesting the next one, raw
|
// requesting the next one, raw network memory stays at ~one segment. Shares the same
|
||||||
// network memory stays at ~one segment. The poll awaits on cancellationToken, so a
|
// drain helper as the per-chunk gate above. No _streamingPlaybackStarted guard is needed
|
||||||
// track switch/seek mid-wait throws OCE and unwinds through the existing drain
|
// here (unlike the per-chunk gate): reaching this point means a full segment was consumed,
|
||||||
// discipline (C6). UC5: a user pause freezes the playhead so the fill never drains —
|
// which is ~24 s (WAV) / ~100 s (Opus) of audio — far past the 6-buffer playback-start
|
||||||
// hold here until playback resumes (IsPaused clears) OR the fill drains on its own.
|
// minimum — so playback is always running by now and the fill can drain. A file that fits
|
||||||
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
// in one segment hits EOF and breaks above, never reaching this gate.
|
||||||
{
|
await DrainBackpressureAsync(cancellationToken);
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
|
||||||
await Task.Delay(BackpressurePollMs, cancellationToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch the next bounded segment. The end offset is clamped implicitly by the server
|
// Fetch the next bounded segment. The end offset is clamped implicitly by the server
|
||||||
// (a request past EOF yields the available tail as a short slice, caught above).
|
// (a request past EOF yields the available tail as a short slice, caught above).
|
||||||
@@ -991,6 +1024,27 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Block the segment loop while the scheduler's decoded forward fill is over high-water, resuming
|
||||||
|
/// once it drains below low-water (Phase 21.2 hysteresis). Shared by the per-chunk gate (inside a
|
||||||
|
/// segment) and the inter-segment gate so both honor identical drain discipline — a guard present on
|
||||||
|
/// one path and absent on the other would let one path overshoot the memory bound.
|
||||||
|
/// <para>
|
||||||
|
/// The poll awaits on <paramref name="cancellationToken"/>, so a track switch/seek mid-wait throws
|
||||||
|
/// OCE and unwinds through the existing drain discipline (C6). UC5: a user pause freezes the playhead
|
||||||
|
/// so the fill never drains on its own — hold here until playback resumes (IsPaused clears) OR the
|
||||||
|
/// fill drains. Returns immediately when nothing is throttled (the steady-state common case).
|
||||||
|
/// </para>
|
||||||
|
/// </summary>
|
||||||
|
private async Task DrainBackpressureAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
await Task.Delay(BackpressurePollMs, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private async Task ThrottledNotifyStateChanged()
|
private async Task ThrottledNotifyStateChanged()
|
||||||
{
|
{
|
||||||
var now = DateTime.UtcNow;
|
var now = DateTime.UtcNow;
|
||||||
|
|||||||
@@ -61,6 +61,16 @@ export class StreamDecoder {
|
|||||||
// at 4 GB by the 32-bit RIFF size field, so overflow is not a practical concern.
|
// at 4 GB by the 32-bit RIFF size field, so overflow is not a practical concern.
|
||||||
private totalRawBytes: number = 0;
|
private totalRawBytes: number = 0;
|
||||||
private processedBytes: number = 0;
|
private processedBytes: number = 0;
|
||||||
|
|
||||||
|
// Absolute count of raw bytes already DROPPED off the front of rawChunks (the memory bound).
|
||||||
|
// processedBytes is an absolute cursor into the whole logical byte stream; rawChunks no longer
|
||||||
|
// begins at stream byte 0 once consumed chunks are compacted away, so extractAlignedData walks
|
||||||
|
// from discardedBytes (the absolute position of rawChunks[0]) rather than 0. totalRawBytes and
|
||||||
|
// every offset stay absolute and unchanged — only the array's front moves. Without this, a long
|
||||||
|
// WAV (e.g. a 92-min mix ≈ 970 MB raw) accumulates its ENTIRE decoded-from body in rawChunks
|
||||||
|
// because consumed chunks were never released; Phase 21.2 bounds only the DECODED scheduler
|
||||||
|
// queue, not this raw queue — so software (HW-accel-off) playback crashed the tab on memory.
|
||||||
|
private discardedBytes: number = 0;
|
||||||
private totalStreamLength: number = 0;
|
private totalStreamLength: number = 0;
|
||||||
private streamComplete: boolean = false;
|
private streamComplete: boolean = false;
|
||||||
private headerError: string | null = null;
|
private headerError: string | null = null;
|
||||||
@@ -94,6 +104,7 @@ export class StreamDecoder {
|
|||||||
this.rawChunks = [];
|
this.rawChunks = [];
|
||||||
this.totalRawBytes = 0;
|
this.totalRawBytes = 0;
|
||||||
this.processedBytes = 0;
|
this.processedBytes = 0;
|
||||||
|
this.discardedBytes = 0;
|
||||||
this.totalStreamLength = totalStreamLength;
|
this.totalStreamLength = totalStreamLength;
|
||||||
this.streamComplete = false;
|
this.streamComplete = false;
|
||||||
this.headerBytesReceived = 0;
|
this.headerBytesReceived = 0;
|
||||||
@@ -228,6 +239,36 @@ export class StreamDecoder {
|
|||||||
this.totalRawBytes += data.length;
|
this.totalRawBytes += data.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drop fully-consumed raw chunks off the front of rawChunks, reclaiming their bytes. A chunk is
|
||||||
|
* droppable only when its ENTIRE span lies at or before processedBytes (the decode cursor); a
|
||||||
|
* chunk that straddles the cursor still has unconsumed tail bytes a later segment will read, so
|
||||||
|
* the walk stops there. discardedBytes tracks the absolute start of rawChunks[0] so
|
||||||
|
* extractAlignedData keeps reading the correct bytes after compaction. Splicing once at the end
|
||||||
|
* (not per chunk) keeps this O(n) in the dropped count.
|
||||||
|
*
|
||||||
|
* This is the raw-side analogue of PlaybackScheduler.evictPlayedBuffers (the decoded side): both
|
||||||
|
* keep their queue bounded to roughly the live window, so a long stream never balloons memory.
|
||||||
|
*/
|
||||||
|
private releaseConsumedChunks(): void {
|
||||||
|
let dropCount = 0;
|
||||||
|
let frontPos = this.discardedBytes;
|
||||||
|
for (const chunk of this.rawChunks) {
|
||||||
|
// Drop only when the whole chunk is behind the cursor (end <= processedBytes). A chunk
|
||||||
|
// ending exactly at processedBytes has every byte consumed and is safe to drop.
|
||||||
|
if (frontPos + chunk.length <= this.processedBytes) {
|
||||||
|
frontPos += chunk.length;
|
||||||
|
dropCount++;
|
||||||
|
} else {
|
||||||
|
break; // this chunk straddles the cursor (or is ahead) — stop.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (dropCount > 0) {
|
||||||
|
this.rawChunks.splice(0, dropCount);
|
||||||
|
this.discardedBytes = frontPos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to decode the next segment of audio.
|
* Try to decode the next segment of audio.
|
||||||
*
|
*
|
||||||
@@ -276,6 +317,9 @@ export class StreamDecoder {
|
|||||||
// Advance only after a successful decode so a thrown timeout/decode
|
// Advance only after a successful decode so a thrown timeout/decode
|
||||||
// failure does not silently drop the segment.
|
// failure does not silently drop the segment.
|
||||||
this.processedBytes += alignedSize;
|
this.processedBytes += alignedSize;
|
||||||
|
// Release fully-consumed raw chunks now that the cursor has moved past them. This is the
|
||||||
|
// memory bound: without it rawChunks retains the whole stream body (the OOM on long WAVs).
|
||||||
|
this.releaseConsumedChunks();
|
||||||
return { buffer, duration: buffer.duration };
|
return { buffer, duration: buffer.duration };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// Re-throw typed errors so the outer drain loop in processChunk /
|
// Re-throw typed errors so the outer drain loop in processChunk /
|
||||||
@@ -339,7 +383,9 @@ export class StreamDecoder {
|
|||||||
let extractedOffset = 0;
|
let extractedOffset = 0;
|
||||||
let remaining = size;
|
let remaining = size;
|
||||||
let streamPosition = this.processedBytes;
|
let streamPosition = this.processedBytes;
|
||||||
let currentPos = 0;
|
// rawChunks[0] now begins at absolute stream byte `discardedBytes` (front-compaction has
|
||||||
|
// dropped everything before it), so the walk starts there, not at 0.
|
||||||
|
let currentPos = this.discardedBytes;
|
||||||
|
|
||||||
for (const chunk of this.rawChunks) {
|
for (const chunk of this.rawChunks) {
|
||||||
if (remaining <= 0) break;
|
if (remaining <= 0) break;
|
||||||
@@ -473,6 +519,7 @@ export class StreamDecoder {
|
|||||||
this.rawChunks = [];
|
this.rawChunks = [];
|
||||||
this.totalRawBytes = 0;
|
this.totalRawBytes = 0;
|
||||||
this.processedBytes = 0;
|
this.processedBytes = 0;
|
||||||
|
this.discardedBytes = 0;
|
||||||
this.totalStreamLength = 0;
|
this.totalStreamLength = 0;
|
||||||
this.streamComplete = false;
|
this.streamComplete = false;
|
||||||
this.headerBytesReceived = 0;
|
this.headerBytesReceived = 0;
|
||||||
@@ -501,6 +548,7 @@ export class StreamDecoder {
|
|||||||
this.rawChunks = [];
|
this.rawChunks = [];
|
||||||
this.totalRawBytes = 0;
|
this.totalRawBytes = 0;
|
||||||
this.processedBytes = 0;
|
this.processedBytes = 0;
|
||||||
|
this.discardedBytes = 0;
|
||||||
this.streamComplete = false;
|
this.streamComplete = false;
|
||||||
this.headerBytesReceived = 0;
|
this.headerBytesReceived = 0;
|
||||||
this.headerSearchChunks = [];
|
this.headerSearchChunks = [];
|
||||||
|
|||||||
Reference in New Issue
Block a user