From aeec58295775d6cf13edaa8486f79728a75697d4 Mon Sep 17 00:00:00 2001 From: daniel-c-harvey Date: Wed, 24 Jun 2026 19:50:33 -0400 Subject: [PATCH] Bound decoded forward fill per chunk in streaming read loop The inter-segment back-pressure gate matched WAV byte density but let a 4MB Opus segment (~100s at 320kbps) decode eagerly into main-process RAM, OOMing the tab with HW accel off. Drain per chunk past high-water, gated on playback start. Adds load-generation diagnostics for the double-load hypothesis. --- .../Services/StreamingAudioPlayerService.cs | 84 +++++++++++++++---- DeepDrftPublic/Interop/audio/StreamDecoder.ts | 50 ++++++++++- 2 files changed, 118 insertions(+), 16 deletions(-) diff --git a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs index 5fece45..5722513 100644 --- a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs +++ b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs @@ -53,6 +53,14 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS private readonly ILogger _logger; private string? _currentTrackId; + // Monotonic load-generation counter (diagnostic). Incremented on every LoadTrackStreaming entry and + // stamped into the load's logs so two loads for ONE user play action — the "Duration set from header + // logged twice" double-load hypothesis that needs in-browser confirmation — are unmistakable: a + // single play should show exactly one "Streaming load #N started"/"finished" pair. If two overlapping + // starts appear for one click, the generation ids pin the re-entrancy. Cheap (an int per load) and + // never gates behavior. + private int _loadGeneration; + // The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and // reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the // initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup @@ -164,6 +172,11 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS // the single-instance JS StreamDecoder. await ResetToIdle(); + // Stamp this load with a fresh generation id (diagnostic — see _loadGeneration). Logged at + // start and finish so a double-load shows as two overlapping start/finish pairs for one play. + var loadGeneration = ++_loadGeneration; + _logger.LogInformation("Streaming load #{Gen} started for track {TrackId}", loadGeneration, track.EntryKey); + // Save track ID for seek operations _currentTrackId = track.EntryKey; // A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the @@ -315,6 +328,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS finally { IsLoading = false; + _logger.LogInformation("Streaming load #{Gen} finished for track {TrackId} (superseded={Superseded})", + loadGeneration, track.EntryKey, !ReferenceEquals(_streamingCancellation, loadCts)); // Only notify if this load is still the active operation. A superseding seek // owns state notifications; firing here mid-seek would push a stale snapshot. if (ReferenceEquals(_streamingCancellation, loadCts)) @@ -519,8 +534,6 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS CanStartStreaming = chunkResult.CanStartStreaming; HeaderParsed = chunkResult.HeaderParsed; BufferedChunks = chunkResult.BufferCount; - // chunkResult.ProductionPaused is informational only on this path — back-pressure - // granularity is one segment (the inter-segment gate below), not per-chunk. // Set duration from header when available (only set once) if (chunkResult.Duration.HasValue && Duration == null) @@ -576,6 +589,29 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS } await ThrottledNotifyStateChanged(); + + // Per-chunk back-pressure — the bound that actually holds for high-density codecs. + // The inter-segment gate alone is matched to WAV's byte density (~24 s of audio per + // 4 MB segment) but NOT to Opus: at 320 kbps a 4 MB segment is ~100 s of decodable + // audio. The inner loop has the whole segment's bytes already in hand, so with no + // network wait to pace it, it would decode the ENTIRE segment eagerly — piling tens + // of MB of decoded f32 PCM AHEAD of a playhead that has barely moved, before the + // inter-segment gate ever runs. With HW accel off that lookahead lives in main- + // process RAM, and the byte ceiling cannot save us because nothing on this path + // polls it. So drain to low-water per chunk once the scheduler is over high-water. + // + // Gated on _streamingPlaybackStarted so this can NEVER block first audio (C2): until + // playback starts the playhead does not advance, so the forward fill would never + // drain and the loop would deadlock. The 30 s high-water sits far above the + // 6-buffer playback-start minimum, so in practice the gate is not even reached + // before playback begins — the guard is the correctness backstop, not the common + // case. Reads the piggybacked flag (no extra interop hop) to DECIDE to drain; the + // drain helper then polls IsProductionPaused — the same steady-state-reads-flag / + // throttled-state-polls split the inter-segment gate uses. + if (_streamingPlaybackStarted && chunkResult.ProductionPaused) + { + await DrainBackpressureAsync(cancellationToken); + } } } while (currentBytes > 0); @@ -608,19 +644,16 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS $"but expected up to {SegmentSizeBytes} and have not reached EOF"); } - // Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH - // instead of pacing ReadAsync on an open stream). Do not fetch the next segment while - // the scheduler is over high-water; wait until it drains below low-water. Because the - // browser only buffers bounded segments and we hold off requesting the next one, raw - // network memory stays at ~one segment. The poll awaits on cancellationToken, so a - // track switch/seek mid-wait throws OCE and unwinds through the existing drain - // discipline (C6). UC5: a user pause freezes the playhead so the fill never drains — - // hold here until playback resumes (IsPaused clears) OR the fill drains on its own. - while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId)) - { - cancellationToken.ThrowIfCancellationRequested(); - await Task.Delay(BackpressurePollMs, cancellationToken); - } + // Inter-segment back-pressure gate (Phase 21.2 fill signal, gating SEGMENT FETCH). Do not + // fetch the next segment while the scheduler is over high-water; wait until it drains + // below low-water. Because the browser only buffers bounded segments and we hold off + // requesting the next one, raw network memory stays at ~one segment. Shares the same + // drain helper as the per-chunk gate above. No _streamingPlaybackStarted guard is needed + // here (unlike the per-chunk gate): reaching this point means a full segment was consumed, + // which is ~24 s (WAV) / ~100 s (Opus) of audio — far past the 6-buffer playback-start + // minimum — so playback is always running by now and the fill can drain. A file that fits + // in one segment hits EOF and breaks above, never reaching this gate. + await DrainBackpressureAsync(cancellationToken); // Fetch the next bounded segment. The end offset is clamped implicitly by the server // (a request past EOF yields the available tail as a short slice, caught above). @@ -991,6 +1024,27 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS } } + /// + /// Block the segment loop while the scheduler's decoded forward fill is over high-water, resuming + /// once it drains below low-water (Phase 21.2 hysteresis). Shared by the per-chunk gate (inside a + /// segment) and the inter-segment gate so both honor identical drain discipline — a guard present on + /// one path and absent on the other would let one path overshoot the memory bound. + /// + /// The poll awaits on , so a track switch/seek mid-wait throws + /// OCE and unwinds through the existing drain discipline (C6). UC5: a user pause freezes the playhead + /// so the fill never drains on its own — hold here until playback resumes (IsPaused clears) OR the + /// fill drains. Returns immediately when nothing is throttled (the steady-state common case). + /// + /// + private async Task DrainBackpressureAsync(CancellationToken cancellationToken) + { + while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId)) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(BackpressurePollMs, cancellationToken); + } + } + private async Task ThrottledNotifyStateChanged() { var now = DateTime.UtcNow; diff --git a/DeepDrftPublic/Interop/audio/StreamDecoder.ts b/DeepDrftPublic/Interop/audio/StreamDecoder.ts index e4c7943..72a8d0f 100644 --- a/DeepDrftPublic/Interop/audio/StreamDecoder.ts +++ b/DeepDrftPublic/Interop/audio/StreamDecoder.ts @@ -61,6 +61,16 @@ export class StreamDecoder { // at 4 GB by the 32-bit RIFF size field, so overflow is not a practical concern. private totalRawBytes: number = 0; private processedBytes: number = 0; + + // Absolute count of raw bytes already DROPPED off the front of rawChunks (the memory bound). + // processedBytes is an absolute cursor into the whole logical byte stream; rawChunks no longer + // begins at stream byte 0 once consumed chunks are compacted away, so extractAlignedData walks + // from discardedBytes (the absolute position of rawChunks[0]) rather than 0. totalRawBytes and + // every offset stay absolute and unchanged — only the array's front moves. Without this, a long + // WAV (e.g. a 92-min mix ≈ 970 MB raw) accumulates its ENTIRE decoded-from body in rawChunks + // because consumed chunks were never released; Phase 21.2 bounds only the DECODED scheduler + // queue, not this raw queue — so software (HW-accel-off) playback crashed the tab on memory. + private discardedBytes: number = 0; private totalStreamLength: number = 0; private streamComplete: boolean = false; private headerError: string | null = null; @@ -94,6 +104,7 @@ export class StreamDecoder { this.rawChunks = []; this.totalRawBytes = 0; this.processedBytes = 0; + this.discardedBytes = 0; this.totalStreamLength = totalStreamLength; this.streamComplete = false; this.headerBytesReceived = 0; @@ -228,6 +239,36 @@ export class StreamDecoder { this.totalRawBytes += data.length; } + /** + * Drop fully-consumed raw chunks off the front of rawChunks, reclaiming their bytes. A chunk is + * droppable only when its ENTIRE span lies at or before processedBytes (the decode cursor); a + * chunk that straddles the cursor still has unconsumed tail bytes a later segment will read, so + * the walk stops there. discardedBytes tracks the absolute start of rawChunks[0] so + * extractAlignedData keeps reading the correct bytes after compaction. Splicing once at the end + * (not per chunk) keeps this O(n) in the dropped count. + * + * This is the raw-side analogue of PlaybackScheduler.evictPlayedBuffers (the decoded side): both + * keep their queue bounded to roughly the live window, so a long stream never balloons memory. + */ + private releaseConsumedChunks(): void { + let dropCount = 0; + let frontPos = this.discardedBytes; + for (const chunk of this.rawChunks) { + // Drop only when the whole chunk is behind the cursor (end <= processedBytes). A chunk + // ending exactly at processedBytes has every byte consumed and is safe to drop. + if (frontPos + chunk.length <= this.processedBytes) { + frontPos += chunk.length; + dropCount++; + } else { + break; // this chunk straddles the cursor (or is ahead) — stop. + } + } + if (dropCount > 0) { + this.rawChunks.splice(0, dropCount); + this.discardedBytes = frontPos; + } + } + /** * Try to decode the next segment of audio. * @@ -276,6 +317,9 @@ export class StreamDecoder { // Advance only after a successful decode so a thrown timeout/decode // failure does not silently drop the segment. this.processedBytes += alignedSize; + // Release fully-consumed raw chunks now that the cursor has moved past them. This is the + // memory bound: without it rawChunks retains the whole stream body (the OOM on long WAVs). + this.releaseConsumedChunks(); return { buffer, duration: buffer.duration }; } catch (error) { // Re-throw typed errors so the outer drain loop in processChunk / @@ -339,7 +383,9 @@ export class StreamDecoder { let extractedOffset = 0; let remaining = size; let streamPosition = this.processedBytes; - let currentPos = 0; + // rawChunks[0] now begins at absolute stream byte `discardedBytes` (front-compaction has + // dropped everything before it), so the walk starts there, not at 0. + let currentPos = this.discardedBytes; for (const chunk of this.rawChunks) { if (remaining <= 0) break; @@ -473,6 +519,7 @@ export class StreamDecoder { this.rawChunks = []; this.totalRawBytes = 0; this.processedBytes = 0; + this.discardedBytes = 0; this.totalStreamLength = 0; this.streamComplete = false; this.headerBytesReceived = 0; @@ -501,6 +548,7 @@ export class StreamDecoder { this.rawChunks = []; this.totalRawBytes = 0; this.processedBytes = 0; + this.discardedBytes = 0; this.streamComplete = false; this.headerBytesReceived = 0; this.headerSearchChunks = [];