From 518479e7ae1aede531147c7a81368620fddcd031 Mon Sep 17 00:00:00 2001 From: daniel-c-harvey Date: Tue, 23 Jun 2026 23:16:08 -0400 Subject: [PATCH] Phase 21.2: back-pressure to bound the unplayed decoded region Shared scheduler fill signal (forward water-marks + hard byte cap) pauses the C# read loop above high-water and, for Opus, stops the demux/decode feed so WebCodecs queues stay near-empty. Routes through the existing cancellation discipline; releases the latch on clear/seek. --- .../Services/AudioInteropService.cs | 24 ++++ .../Services/StreamingAudioPlayerService.cs | 28 +++++ DeepDrftPublic/Interop/audio/AudioPlayer.ts | 30 ++++- .../Interop/audio/OpusStreamDecoder.test.ts | 59 +++++++++ .../Interop/audio/OpusStreamDecoder.ts | 72 ++++++++++- .../Interop/audio/PlaybackScheduler.test.ts | 116 ++++++++++++++++++ .../Interop/audio/PlaybackScheduler.ts | 107 ++++++++++++++++ DeepDrftPublic/Interop/audio/index.ts | 8 ++ 8 files changed, 436 insertions(+), 8 deletions(-) diff --git a/DeepDrftPublic.Client/Services/AudioInteropService.cs b/DeepDrftPublic.Client/Services/AudioInteropService.cs index 966fd99..d463aa3 100644 --- a/DeepDrftPublic.Client/Services/AudioInteropService.cs +++ b/DeepDrftPublic.Client/Services/AudioInteropService.cs @@ -159,6 +159,25 @@ public class AudioInteropService : IAsyncDisposable } } + /// + /// Phase 21.2a back-pressure poll: ask whether the scheduler is still over its forward + /// high/low-water band. The read loop calls this only WHILE already throttled, to learn when it + /// may resume reading — the steady-state loop reads the piggybacked ProductionPaused flag + /// off each chunk result instead. Defaults to false on any interop failure so a torn-down player + /// never wedges a loop that is exiting anyway. + /// + public async Task IsProductionPaused(string playerId) + { + try + { + return await _jsRuntime.InvokeAsync("DeepDrftAudio.isProductionPaused", playerId); + } + catch + { + return false; + } + } + public async Task ReinitializeFromOffset(string playerId, long totalStreamLength, double seekPosition) { return await InvokeJsAsync("DeepDrftAudio.reinitializeFromOffset", playerId, totalStreamLength, seekPosition); @@ -419,6 +438,11 @@ public class StreamingResult : AudioOperationResult public bool HeaderParsed { get; set; } public int BufferCount { get; set; } public double? Duration { get; set; } // Duration in seconds calculated from WAV header + + // Phase 21.2a back-pressure: true when the scheduler's forward decoded fill is over the + // high-water mark and the C# read loop should stop calling ReadAsync until it drains. Read off + // the chunk result the loop already awaits — no extra interop hop in the unthrottled steady state. + public bool ProductionPaused { get; set; } } public class AudioPlayerState diff --git a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs index 9b6f856..71346b0 100644 --- a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs +++ b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs @@ -16,6 +16,14 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS // Adaptive chunk sizing private const int MinBufferSize = 16 * 1024; // 16KB minimum private const int MaxBufferSize = 64 * 1024; // 64KB maximum + + // Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water + // mark, the read loop stops calling ReadAsync and polls IsProductionPaused at this cadence + // until the fill drains below low-water. 100 ms is well under the low-water lookahead (seconds), + // so resume is prompt relative to the playhead — no starvation (AC3) — while keeping the poll + // cheap. The poll honors the loop's cancellation token, so a track switch/seek during a pause + // exits through the same drain discipline as a pause during ReadAsync (C6). + private const int BackpressurePollMs = 100; private int _currentBufferSize = DefaultBufferSize; private int _consecutiveSlowReads = 0; @@ -465,6 +473,26 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS } await ThrottledNotifyStateChanged(); + + // Phase 21.2a back-pressure (serves BOTH paths). The chunk we just processed + // reported the scheduler's forward fill is over the high-water mark — stop + // reading the socket so the unplayed decoded region stays bounded. Pausing + // ReadAsync lets the kernel TCP window close (we are working WITH transport flow + // control, not against it). Poll until the fill drains below low-water, then + // resume the loop. For WAV this is the whole story (StreamDecoder decodes + // synchronously into the scheduler); the Opus feed additionally self-throttles + // its demux/decode off the SAME signal (21.2b), so its upstream queues stay + // near-empty too. The poll awaits on cancellationToken, so a track switch/seek + // mid-pause throws OCE and unwinds through the existing drain discipline (C6) — + // no separate cancellation path, no stale read racing a reinit. + if (chunkResult.ProductionPaused) + { + while (await _audioInterop.IsProductionPaused(PlayerId)) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(BackpressurePollMs, cancellationToken); + } + } } } while (currentBytes > 0); diff --git a/DeepDrftPublic/Interop/audio/AudioPlayer.ts b/DeepDrftPublic/Interop/audio/AudioPlayer.ts index 06ce602..9e6f2e1 100644 --- a/DeepDrftPublic/Interop/audio/AudioPlayer.ts +++ b/DeepDrftPublic/Interop/audio/AudioPlayer.ts @@ -30,6 +30,10 @@ export interface StreamingResult extends AudioResult { headerParsed?: boolean; bufferCount?: number; duration?: number; + // Phase 21.2a back-pressure signal piggybacked on the chunk result the C# read loop already + // awaits — true means the scheduler's forward fill is over the high-water mark and the loop + // should stop calling ReadAsync until it drains (no extra interop hop in the common case). + productionPaused?: boolean; } export interface AudioState { @@ -133,7 +137,14 @@ export class AudioPlayer { // selects Opus when the sidecar parsed, so the null branch is defensive. if (this.isOpusContentType(contentType) && this.pendingOpusSidecar) { this.activeOpusSidecar = this.pendingOpusSidecar; - this.opusDecoder = new OpusStreamDecoder(this.contextManager, this.pendingOpusSidecar); + // Pass the shared back-pressure signal (21.2b): the Opus decoder stops demuxing/ + // decoding new packets while the scheduler is full, so the WebCodecs decode queue + // and decodedQueue do not balloon behind a throttled socket (OQ7). Same signal the + // C# read loop honors — one policy, two thin hooks. + this.opusDecoder = new OpusStreamDecoder( + this.contextManager, + this.pendingOpusSidecar, + () => this.scheduler.isProductionPaused()); return { success: true }; } @@ -263,7 +274,8 @@ export class AudioPlayer { canStartStreaming: canStart, headerParsed, bufferCount: this.scheduler.getBufferCount(), - duration: this.duration + duration: this.duration, + productionPaused: this.scheduler.isProductionPaused() }; } catch (error) { return { success: false, error: (error as Error).message }; @@ -307,7 +319,8 @@ export class AudioPlayer { canStartStreaming: canStart, headerParsed: this.streamDecoder.headerParsed, bufferCount: this.scheduler.getBufferCount(), - duration: this.duration + duration: this.duration, + productionPaused: this.scheduler.isProductionPaused() }; } catch (error) { return { success: false, error: (error as Error).message }; @@ -508,6 +521,17 @@ export class AudioPlayer { return this.scheduler.getTotalDuration() + this.scheduler.getPlaybackOffset(); } + /** + * The shared back-pressure signal (Phase 21.2a), polled by the C# read loop WHILE it is + * already throttled to learn when the forward fill has drained below the low-water mark and it + * may resume reading. The steady-state (unthrottled) loop never calls this — it reads the + * piggybacked productionPaused flag off each chunk result instead, so there is no extra + * interop hop until back-pressure actually engages. + */ + isProductionPaused(): boolean { + return this.scheduler.isProductionPaused(); + } + /** * Calculate byte offset for a time position (for C# layer) */ diff --git a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts index 44f716b..da9d72e 100644 --- a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts +++ b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts @@ -448,6 +448,65 @@ test('OpusStreamDecoder.totalDuration is null when the sidecar carries no positi assertEqual(decoder.totalDuration, null, 'no positive duration -> null'); }); +// --- Phase 21.2b: Opus decode-ahead back-pressure (the stash-while-full half) ------------------ +// +// When the shared scheduler is full, push() must NOT demux/decode ahead — it stashes the raw bytes +// and returns nothing, so the WebCodecs decode queue and decodedQueue stay near-empty (OQ7). The +// stash-while-full branch returns BEFORE ensureConfigured(), so it is testable without WebCodecs +// (no AudioDecoder is constructed). The drain-on-resume path needs the real WebCodecs decoder and +// stays browser-verified; here we pin the bound itself and the lifecycle resets. + +// Access the private stash for white-box assertions (same idiom the scheduler tests use). +function stashLength(decoder: OpusStreamDecoder): number { + return (decoder as unknown as { pendingBytes: Uint8Array[] }).pendingBytes.length; +} + +// The stash-while-full branch returns synchronously at the top of push() (before any real await), +// so the stash is observable immediately without awaiting the returned promise — keeping these +// tests inside the synchronous inline harness (which does not await test bodies). +test('push stashes bytes and decodes nothing while the scheduler is full (no decode-ahead)', () => { + const sidecar = sidecarFrom({ + setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64], + totalByteLength: 500_000, totalDuration: 100, preSkip: 312, + points: [{ granule: 312, byteOffset: 4096 }], + }); + // Scheduler reports "full" → push must short-circuit before touching WebCodecs. + const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true); + + void decoder.push(new Uint8Array([1, 2, 3])); + void decoder.push(new Uint8Array([4, 5])); + + assertEqual(stashLength(decoder), 2, 'both chunks stashed in arrival order'); + assertEqual(decoder.ready, false, 'decoder not even configured while throttled'); +}); + +test('reinitializeForRangeContinuation drops the pre-seek stash (C6 — no stale feed across reset)', () => { + const sidecar = sidecarFrom({ + setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64], + totalByteLength: 500_000, totalDuration: 100, preSkip: 312, + points: [{ granule: 312, byteOffset: 4096 }], + }); + const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true); + void decoder.push(new Uint8Array([1, 2, 3])); // stash one chunk while full + assertEqual(stashLength(decoder), 1, 'one chunk stashed pre-seek'); + + decoder.reinitializeForRangeContinuation(0, 5); // a seek + assertEqual(stashLength(decoder), 0, 'pre-seek stash dropped on range-continuation'); +}); + +test('dispose clears the stash', () => { + const sidecar = sidecarFrom({ + setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64], + totalByteLength: 500_000, totalDuration: 100, preSkip: 312, + points: [{ granule: 312, byteOffset: 4096 }], + }); + const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true); + void decoder.push(new Uint8Array([9])); + assertEqual(stashLength(decoder), 1, 'stashed'); + decoder.dispose(); + assertEqual(stashLength(decoder), 0, 'stash cleared on dispose'); +}); + function concat(arrs: Uint8Array[]): Uint8Array { let len = 0; for (const a of arrs) len += a.length; diff --git a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts index 01c61f2..a3db462 100644 --- a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts +++ b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts @@ -39,6 +39,17 @@ const MAX_PACKET_FRAMES = 5760; export class OpusStreamDecoder implements IStreamingDecoder { private readonly contextManager: AudioContextManager; private readonly sidecar: OpusSeekData; + // Phase 21.2b back-pressure hook: returns true when the shared scheduler is full (forward fill + // over high-water). While full, push() stashes raw bytes WITHOUT demuxing/decoding so the + // WebCodecs decode queue and decodedQueue stay near-empty behind a throttled socket (OQ7). + // Null = no back-pressure (e.g. unit tests), in which case the decoder feeds eagerly as before. + private readonly isSchedulerFull: (() => boolean) | null; + + // Raw bytes received while the scheduler was full, held undemuxed until it drains. The C# read + // loop also pauses above high-water, so this stash is bounded to at most the in-flight chunks + // between the loop reading the productionPaused flag and actually stopping — a handful of KB, + // not a decode-ahead. Drained (demuxed + decoded) on the next push once below high-water. + private pendingBytes: Uint8Array[] = []; private demuxer = new OggDemuxer(); private decoder: AudioDecoder | null = null; @@ -66,9 +77,13 @@ export class OpusStreamDecoder implements IStreamingDecoder { private emittedFrames = 0; private readonly totalFrames: number; - constructor(contextManager: AudioContextManager, sidecar: OpusSeekData) { + constructor( + contextManager: AudioContextManager, + sidecar: OpusSeekData, + isSchedulerFull: (() => boolean) | null = null) { this.contextManager = contextManager; this.sidecar = sidecar; + this.isSchedulerFull = isSchedulerFull; this.totalFrames = sidecar.totalDurationSeconds > 0 ? Math.round(sidecar.totalDurationSeconds * OPUS_SAMPLE_RATE) : Number.POSITIVE_INFINITY; @@ -136,17 +151,59 @@ export class OpusStreamDecoder implements IStreamingDecoder { async push(chunk: Uint8Array): Promise { if (this.fatalError) return []; + + // 21.2b back-pressure: while the scheduler is full, do NOT demux/decode ahead. Stash the + // raw bytes in arrival order and return nothing — the WebCodecs decode queue and + // decodedQueue stay near-empty (OQ7). The bytes are demuxed/decoded on a later push once + // the scheduler has drained below low-water, in exactly the order received (Ogg demux is + // order-sensitive). configure() is deferred too — no need to spin up the decoder while + // throttled. The C# loop also stops reading above high-water, so the stash stays small. + if (this.isSchedulerFull?.()) { + this.pendingBytes.push(chunk); + return []; + } + if (!(await this.ensureConfigured())) return []; - const packets = this.demuxer.push(chunk); - this.decodePackets(packets); + // Drained below high-water: replay any stashed bytes first (preserving stream order), then + // the new chunk, through the demuxer as one contiguous feed. + const out: AudioBuffer[] = []; + if (this.pendingBytes.length > 0) { + const stashed = this.pendingBytes; + this.pendingBytes = []; + for (const bytes of stashed) { + this.decodePackets(this.demuxer.push(bytes)); + } + } + + this.decodePackets(this.demuxer.push(chunk)); // Wait until the WebCodecs decoder has processed the queued packets before draining. await this.yieldToDecoder(); - return this.drainDecoded(); + out.push(...this.drainDecoded()); + return out; } async complete(): Promise { - if (this.fatalError || !this.decoder || this.decoder.state !== 'configured') { + if (this.fatalError) { + return this.drainDecoded(); + } + + // End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track + // that finished sending before the scheduler drained). Configure if needed and replay the + // stash so the tail is decoded before flush — otherwise the final seconds would be lost. + if (this.pendingBytes.length > 0) { + if (await this.ensureConfigured()) { + const stashed = this.pendingBytes; + this.pendingBytes = []; + for (const bytes of stashed) { + this.decodePackets(this.demuxer.push(bytes)); + } + } else { + this.pendingBytes = []; + } + } + + if (!this.decoder || this.decoder.state !== 'configured') { return this.drainDecoded(); } try { @@ -184,6 +241,10 @@ export class OpusStreamDecoder implements IStreamingDecoder { // continuation mode (treat the first page's packets as audio — no setup pages in a 206 body). this.demuxer.reset(true); this.decodedQueue = []; + // Drop any bytes stashed by back-pressure: they belong to the PRE-seek stream position and + // must never be replayed against the post-seek (range-continuation) demux state (C6 — no + // stale feed racing the reset). + this.pendingBytes = []; this.emittedFrames = 0; // post-seek buffers are positioned by the scheduler's playbackOffset // Arm the lead trim: skip enough decoded frames to land at targetTimeSeconds, not at // landingTimeSeconds (the page start). Clamp to ≥0 to guard against floating-point rounding. @@ -199,6 +260,7 @@ export class OpusStreamDecoder implements IStreamingDecoder { try { d.close(); } catch { /* already closed */ } } this.decodedQueue = []; + this.pendingBytes = []; if (this.decoder && this.decoder.state !== 'closed') { try { this.decoder.close(); } catch { /* already closed */ } } diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts index ec75d0e..8395147 100644 --- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts +++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts @@ -85,6 +85,14 @@ function buf(duration: number): AudioBuffer { return { duration } as AudioBuffer; } +/** + * A decoded buffer carrying realistic byte-footprint fields (length + numberOfChannels) for the + * OQ3 byte-ceiling test. Models 48 kHz stereo float PCM: length = duration × 48000 frames, 2 ch. + */ +function bufBytes(duration: number): AudioBuffer { + return { duration, length: Math.round(duration * 48000), numberOfChannels: 2 } as AudioBuffer; +} + function makeScheduler(cm: FakeContextManager): PlaybackScheduler { // The scheduler only uses the subset FakeContextManager implements. return new PlaybackScheduler(cm as unknown as AudioContextManager); @@ -325,6 +333,114 @@ test('eviction via handleSourceEnded: position exact, live bufferIndex decrement } }); +// === Phase 21.2 back-pressure: the forward water-mark signal ================================= +// +// The signal is pure given the clock + buffer durations + the playhead position, so it is +// testable in Node with the same fakes. We drive forward lookahead by adding buffers (fill) and +// advancing the clock (drain), and assert the hysteresis latch and the OQ3 byte ceiling. + +/** + * Fill the scheduler with `count` 1 s buffers, start playback at t=0, and advance the schedule + * cursor to the end so nextBufferIndex does not pin anything. Leaves all `count` buffers decoded + * and the playhead at the clock position the caller sets afterwards. + */ +function fillAndStart(s: PlaybackScheduler, cm: FakeContextManager, count: number): void { + for (let i = 0; i < count; i++) s.addBuffer(buf(1)); + cm.now = 0; + s.playFromPosition(0); + advanceCursorToEnd(s, cm); +} + +// High-water reached → production pauses; the signal reflects the forward lookahead. +test('isProductionPaused latches true when forward lookahead reaches high-water', () => { + const cm = new FakeContextManager(); + const s = makeScheduler(cm); + s.setForwardWindow(10, 5, 0); // high 10s, low 5s, byte cap disabled + fillAndStart(s, cm, 40); // 40s decoded, track [0,40) + + cm.now = 0; // playhead at 0 → forward lookahead = 40s ≥ 10s high-water + assertEqual(s.getForwardLookaheadSeconds(), 40, 'lookahead is full decoded tail at t=0'); + assertEqual(s.isProductionPaused(), true, 'pauses at/above high-water'); +}); + +// Below high-water but above low-water while NOT yet paused → stays unpaused (no premature pause). +test('isProductionPaused stays false in the hysteresis band before the high-water crossing', () => { + const cm = new FakeContextManager(); + const s = makeScheduler(cm); + s.setForwardWindow(10, 5, 0); + fillAndStart(s, cm, 8); // 8s decoded + + cm.now = 0; // lookahead 8s: between low(5) and high(10), never latched → unpaused + assertEqual(s.isProductionPaused(), false, 'no pause until high-water is actually reached'); +}); + +// Hysteresis: once paused at high-water, stays paused through the band until lookahead drains +// below low-water, then resumes. Drain is modeled by advancing the clock (playhead moves forward, +// shrinking forward lookahead). +test('isProductionPaused holds through the band and resumes only below low-water', () => { + const cm = new FakeContextManager(); + const s = makeScheduler(cm); + s.setForwardWindow(10, 5, 0); + fillAndStart(s, cm, 40); // track [0,40) + + cm.now = 0; + assertEqual(s.isProductionPaused(), true, 'latched at high-water (40s ahead)'); + + // Playhead at 32 → lookahead 8s: in the band (5..10) → must STAY paused (hysteresis). + cm.now = 32; + assertEqual(s.getForwardLookaheadSeconds(), 8, 'lookahead drained to 8s'); + assertEqual(s.isProductionPaused(), true, 'still paused inside the band'); + + // Playhead at 36 → lookahead 4s ≤ low-water 5 → resume. + cm.now = 36; + assertEqual(s.getForwardLookaheadSeconds(), 4, 'lookahead below low-water'); + assertEqual(s.isProductionPaused(), false, 'resumes below low-water'); + + // Refill back over high-water re-latches (the next chunk would re-pause). + for (let i = 0; i < 20; i++) s.addBuffer(buf(1)); // +20s decoded ahead + assertEqual(s.isProductionPaused(), true, 're-latches when fill exceeds high-water again'); +}); + +// OQ3 hard byte ceiling pauses production independent of the time window, and releases as soon as +// the footprint is back under the cap (no separate low-water band on the byte guard). +test('OQ3 byte ceiling pauses regardless of the time window', () => { + const cm = new FakeContextManager(); + const s = makeScheduler(cm); + // Each 1s buffer here is 48000 frames × 2 ch × 4 bytes = 384000 bytes. Cap at ~1.5 MB ≈ 4 buffers. + const perBuffer = 48000 * 2 * 4; + s.setForwardWindow(1000, 500, perBuffer * 4); // time window huge so only the byte cap can fire + for (let i = 0; i < 6; i++) s.addBuffer(bufBytes(1)); // 6 buffers > 4-buffer cap + cm.now = 0; + s.playFromPosition(0); + advanceCursorToEnd(s, cm); + + cm.now = 0; + if (s.getDecodedByteEstimate() <= perBuffer * 4) { + throw new Error('test setup: byte estimate should exceed the cap'); + } + assertEqual(s.isProductionPaused(), true, 'byte ceiling pauses even with a huge time window'); +}); + +// clear() / clearForSeek() release the latch so a fresh stream/seek starts unthrottled (C2). +test('clear and clearForSeek release the back-pressure latch (C2 latency parity)', () => { + const cm = new FakeContextManager(); + const s = makeScheduler(cm); + s.setForwardWindow(10, 5, 0); + fillAndStart(s, cm, 40); + cm.now = 0; + assertEqual(s.isProductionPaused(), true, 'latched'); + + s.clear(); + // After clear there are no buffers, lookahead is 0, and the latch is reset → unpaused. + assertEqual(s.isProductionPaused(), false, 'clear resets the latch and empties fill'); + + fillAndStart(s, cm, 40); + cm.now = 0; + assertEqual(s.isProductionPaused(), true, 'latched again after refill'); + s.clearForSeek(); + assertEqual(s.isProductionPaused(), false, 'clearForSeek resets the latch'); +}); + // --- run ------------------------------------------------------------------------------------- if (failures.length > 0) { console.error(failures.join('\n')); diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts index 975dc45..f0043fb 100644 --- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts +++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts @@ -36,6 +36,32 @@ import { AudioContextManager } from './AudioContextManager.js'; */ const DEFAULT_BACK_RETAIN_SECONDS = 10; +/** + * Forward back-pressure water-marks (Phase 21.2 — the bound on the *unplayed* region). + * + * The single back-pressure signal is the scheduler's decoded forward lookahead: how many + * seconds of decoded audio sit AHEAD of the playhead (OQ7). Production (the C# read loop and, + * for Opus, the demux/decode feed) pauses above the high-water mark and resumes below the + * low-water mark — classic hysteresis so the two producers do not chatter on/off per chunk. + * + * Provisional time-based defaults (OQ1 — 21.4 tunes them): + * - HIGH (30 s): the most decoded lookahead we hold ahead of the playhead before throttling. + * Comfortably above the playback-start minimum (6 buffers ≈ a second or two), so C2 holds — + * first audio never waits on a throttle (the high-water is reached only well after playback + * is already running). + * - LOW (15 s): resume producing here. Kept generous so the forward fill never drains to the + * ~500 ms scheduler lookahead under normal network jitter (AC3 — no starvation). + * + * OQ3 hard memory ceiling: an absolute byte cap on total decoded float held, independent of the + * time window. This is the guard-rail that makes "1 GB never OOMs" a guarantee rather than a + * tuning hope — if a pathological stream packs an unusual amount of decoded audio into the time + * window, the byte cap still pauses production. Estimated as channels × frames × 4 bytes (f32). + */ +const DEFAULT_FORWARD_HIGH_WATER_SECONDS = 30; +const DEFAULT_FORWARD_LOW_WATER_SECONDS = 15; +const DEFAULT_MAX_DECODED_BYTES = 96 * 1024 * 1024; // ~96 MB of decoded float PCM +const BYTES_PER_FLOAT_SAMPLE = 4; + interface ScheduledSource { source: AudioBufferSourceNode; bufferIndex: number; @@ -66,6 +92,19 @@ export class PlaybackScheduler { // 21.2 will drive this from the window policy. Not a hardcoded eviction decision. private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS; + // Forward back-pressure water-marks + the OQ3 hard byte ceiling (Phase 21.2). This is the + // single shared window policy (OQ6): both producers read isProductionPaused() and honor it + // in their own way — the C# read loop stops ReadAsync, the Opus feed stops demux/decode. + private forwardHighWaterSeconds: number = DEFAULT_FORWARD_HIGH_WATER_SECONDS; + private forwardLowWaterSeconds: number = DEFAULT_FORWARD_LOW_WATER_SECONDS; + private maxDecodedBytes: number = DEFAULT_MAX_DECODED_BYTES; + + // Hysteresis latch for the production pause. Once forward fill crosses the high-water mark we + // stay paused until it drains below the low-water mark, so the two producers do not flap + // on/off around a single threshold (and a paused producer does not resume for one chunk only + // to re-pause immediately). False until first crossing; flips on the band edges. + private productionPaused_: boolean = false; + // Callbacks public onPlaybackEnded: (() => void) | null = null; @@ -132,6 +171,68 @@ export class PlaybackScheduler { this.backRetainSeconds = Math.max(0, seconds); } + /** + * Configure the forward back-pressure water-marks (seconds of decoded lookahead) and the OQ3 + * hard byte ceiling. Provisional config seam — 21.4 tunes the numbers. Low is clamped below + * high so the hysteresis band is always valid; non-positive byte cap disables the OQ3 guard. + */ + setForwardWindow(highWaterSeconds: number, lowWaterSeconds: number, maxDecodedBytes: number): void { + this.forwardHighWaterSeconds = Math.max(0, highWaterSeconds); + this.forwardLowWaterSeconds = Math.max(0, Math.min(lowWaterSeconds, this.forwardHighWaterSeconds)); + this.maxDecodedBytes = maxDecodedBytes; + } + + /** + * Seconds of decoded audio sitting AHEAD of the current playhead — the forward fill. This is + * the single back-pressure signal (OQ7): the absolute end time of the last decoded buffer + * minus the current playback position. Never negative (clamped at 0 when the playhead has + * caught up to or passed the decoded tail). + */ + getForwardLookaheadSeconds(): number { + const decodedEnd = this.getTotalDuration() + this.playbackOffset; + return Math.max(0, decodedEnd - this.getCurrentPosition()); + } + + /** + * Estimated bytes of decoded float PCM currently retained (OQ3 input). Web Audio AudioBuffers + * are 32-bit float per sample per channel; frames = duration × sampleRate. Summed across the + * retained buffers only — evicted buffers are already reclaimed, so this tracks the live + * footprint, not the whole track. + */ + getDecodedByteEstimate(): number { + let bytes = 0; + for (const b of this.buffers) { + bytes += b.length * b.numberOfChannels * BYTES_PER_FLOAT_SAMPLE; + } + return bytes; + } + + /** + * The single shared production-pause decision (Phase 21.2, OQ6/OQ7). Both producers — the C# + * read loop (21.2a) and the Opus demux/decode feed (21.2b) — call this and stop producing + * while it returns true. Hysteresis: pause when forward lookahead exceeds the high-water mark + * OR the decoded byte estimate exceeds the OQ3 ceiling; resume only once forward lookahead has + * drained below the low-water mark AND the byte estimate is back under the ceiling. The + * byte-ceiling test has no separate low-water band — it is the hard guard rail, so it releases + * as soon as eviction brings the footprint back under the cap. + */ + isProductionPaused(): boolean { + const lookahead = this.getForwardLookaheadSeconds(); + const overByteCeiling = this.maxDecodedBytes > 0 && this.getDecodedByteEstimate() > this.maxDecodedBytes; + + if (this.productionPaused_) { + // Stay paused until BOTH the time window has drained below low-water AND the byte + // footprint is back under the ceiling. + if (lookahead <= this.forwardLowWaterSeconds && !overByteCeiling) { + this.productionPaused_ = false; + } + } else if (lookahead >= this.forwardHighWaterSeconds || overByteCeiling) { + this.productionPaused_ = true; + } + + return this.productionPaused_; + } + /** * Drop already-played buffers from the front of the array, reclaiming their decoded float * memory, and advance the time anchor so all position/index bookkeeping stays exact. @@ -418,6 +519,9 @@ export class PlaybackScheduler { this.nextBufferIndex = 0; this.nextScheduleTime = 0; this.playbackOffset = 0; + // Release the back-pressure latch — a fresh stream must start unthrottled so its first + // chunks decode immediately (C2: no throttle-induced first-audio stall). + this.productionPaused_ = false; } /** @@ -432,6 +536,9 @@ export class PlaybackScheduler { this.nextBufferIndex = 0; this.nextScheduleTime = 0; // Note: playbackOffset is NOT reset - it will be set by the caller + // Release the back-pressure latch — the post-seek continuation must refill from the new + // offset without inheriting the pre-seek paused state. + this.productionPaused_ = false; } /** diff --git a/DeepDrftPublic/Interop/audio/index.ts b/DeepDrftPublic/Interop/audio/index.ts index dd863b9..136b329 100644 --- a/DeepDrftPublic/Interop/audio/index.ts +++ b/DeepDrftPublic/Interop/audio/index.ts @@ -117,6 +117,14 @@ const DeepDrftAudio = { return player?.calculateByteOffset(positionSeconds) ?? 0; }, + // Phase 21.2a back-pressure poll: the C# read loop calls this WHILE throttled to learn when + // the scheduler has drained below low-water and reading may resume. A missing player reads as + // "not paused" so a torn-down player never wedges a loop that is already exiting. + isProductionPaused: (playerId: string): boolean => { + const player = audioPlayers.get(playerId); + return player?.isProductionPaused() ?? false; + }, + reinitializeFromOffset: (playerId: string, totalStreamLength: number, seekPosition: number): AudioResult => { const player = audioPlayers.get(playerId); if (!player) return { success: false, error: 'Player not found' };