From 29e8747c696bd5ecd4458c1b15ed049754f786e0 Mon Sep 17 00:00:00 2001 From: daniel-c-harvey Date: Tue, 23 Jun 2026 23:28:42 -0400 Subject: [PATCH] 21.2 review remediation: pause-spin, OQ7 comment, rename, C2 cross-check Skip the back-pressure interop poll while paused (UC5). Document complete() draining the stash in full by design. Rename scheduler isProductionPaused to evaluateProductionPause (latch-advancing); window exposure name unchanged. --- .../Services/StreamingAudioPlayerService.cs | 7 ++++- DeepDrftPublic/Interop/audio/AudioPlayer.ts | 8 +++--- .../Interop/audio/OpusStreamDecoder.ts | 6 ++++ .../Interop/audio/PlaybackScheduler.test.ts | 28 +++++++++---------- .../Interop/audio/PlaybackScheduler.ts | 17 +++++++---- 5 files changed, 42 insertions(+), 24 deletions(-) diff --git a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs index 71346b0..08c0bec 100644 --- a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs +++ b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs @@ -487,7 +487,12 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS // no separate cancellation path, no stale read racing a reinit. if (chunkResult.ProductionPaused) { - while (await _audioInterop.IsProductionPaused(PlayerId)) + // UC5: while the user is paused, the playhead is frozen so forward lookahead + // never shrinks and the poll would spin indefinitely. Wait here until playback + // resumes (IsPaused clears) OR the fill drains on its own. Cancellation is + // unchanged: a track switch/seek/stop while paused still throws OCE and unwinds + // through the existing drain discipline (C6) — no weakening of the cancel path. + while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId)) { cancellationToken.ThrowIfCancellationRequested(); await Task.Delay(BackpressurePollMs, cancellationToken); diff --git a/DeepDrftPublic/Interop/audio/AudioPlayer.ts b/DeepDrftPublic/Interop/audio/AudioPlayer.ts index 9e6f2e1..6fea8dc 100644 --- a/DeepDrftPublic/Interop/audio/AudioPlayer.ts +++ b/DeepDrftPublic/Interop/audio/AudioPlayer.ts @@ -144,7 +144,7 @@ export class AudioPlayer { this.opusDecoder = new OpusStreamDecoder( this.contextManager, this.pendingOpusSidecar, - () => this.scheduler.isProductionPaused()); + () => this.scheduler.evaluateProductionPause()); return { success: true }; } @@ -275,7 +275,7 @@ export class AudioPlayer { headerParsed, bufferCount: this.scheduler.getBufferCount(), duration: this.duration, - productionPaused: this.scheduler.isProductionPaused() + productionPaused: this.scheduler.evaluateProductionPause() }; } catch (error) { return { success: false, error: (error as Error).message }; @@ -320,7 +320,7 @@ export class AudioPlayer { headerParsed: this.streamDecoder.headerParsed, bufferCount: this.scheduler.getBufferCount(), duration: this.duration, - productionPaused: this.scheduler.isProductionPaused() + productionPaused: this.scheduler.evaluateProductionPause() }; } catch (error) { return { success: false, error: (error as Error).message }; @@ -529,7 +529,7 @@ export class AudioPlayer { * interop hop until back-pressure actually engages. */ isProductionPaused(): boolean { - return this.scheduler.isProductionPaused(); + return this.scheduler.evaluateProductionPause(); } /** diff --git a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts index a3db462..fa05d93 100644 --- a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts +++ b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts @@ -191,6 +191,12 @@ export class OpusStreamDecoder implements IStreamingDecoder { // End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track // that finished sending before the scheduler drained). Configure if needed and replay the // stash so the tail is decoded before flush — otherwise the final seconds would be lost. + // + // OQ7/AC1-Opus precision note: the stash here is drained in full without a water-mark + // check. This is intentionally correct: the stream has ended — you cannot back-pressure a + // finished stream — and the remainder is tail-only (bounded by whatever the throttled C# + // loop left in flight, which is at most one push() worth of bytes). Adding a water-mark + // gate to complete() would silently drop the track's tail and is therefore wrong. if (this.pendingBytes.length > 0) { if (await this.ensureConfigured()) { const stashed = this.pendingBytes; diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts index 8395147..edc0a89 100644 --- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts +++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts @@ -352,7 +352,7 @@ function fillAndStart(s: PlaybackScheduler, cm: FakeContextManager, count: numbe } // High-water reached → production pauses; the signal reflects the forward lookahead. -test('isProductionPaused latches true when forward lookahead reaches high-water', () => { +test('evaluateProductionPause latches true when forward lookahead reaches high-water', () => { const cm = new FakeContextManager(); const s = makeScheduler(cm); s.setForwardWindow(10, 5, 0); // high 10s, low 5s, byte cap disabled @@ -360,45 +360,45 @@ test('isProductionPaused latches true when forward lookahead reaches high-water' cm.now = 0; // playhead at 0 → forward lookahead = 40s ≥ 10s high-water assertEqual(s.getForwardLookaheadSeconds(), 40, 'lookahead is full decoded tail at t=0'); - assertEqual(s.isProductionPaused(), true, 'pauses at/above high-water'); + assertEqual(s.evaluateProductionPause(), true, 'pauses at/above high-water'); }); // Below high-water but above low-water while NOT yet paused → stays unpaused (no premature pause). -test('isProductionPaused stays false in the hysteresis band before the high-water crossing', () => { +test('evaluateProductionPause stays false in the hysteresis band before the high-water crossing', () => { const cm = new FakeContextManager(); const s = makeScheduler(cm); s.setForwardWindow(10, 5, 0); fillAndStart(s, cm, 8); // 8s decoded cm.now = 0; // lookahead 8s: between low(5) and high(10), never latched → unpaused - assertEqual(s.isProductionPaused(), false, 'no pause until high-water is actually reached'); + assertEqual(s.evaluateProductionPause(), false, 'no pause until high-water is actually reached'); }); // Hysteresis: once paused at high-water, stays paused through the band until lookahead drains // below low-water, then resumes. Drain is modeled by advancing the clock (playhead moves forward, // shrinking forward lookahead). -test('isProductionPaused holds through the band and resumes only below low-water', () => { +test('evaluateProductionPause holds through the band and resumes only below low-water', () => { const cm = new FakeContextManager(); const s = makeScheduler(cm); s.setForwardWindow(10, 5, 0); fillAndStart(s, cm, 40); // track [0,40) cm.now = 0; - assertEqual(s.isProductionPaused(), true, 'latched at high-water (40s ahead)'); + assertEqual(s.evaluateProductionPause(), true, 'latched at high-water (40s ahead)'); // Playhead at 32 → lookahead 8s: in the band (5..10) → must STAY paused (hysteresis). cm.now = 32; assertEqual(s.getForwardLookaheadSeconds(), 8, 'lookahead drained to 8s'); - assertEqual(s.isProductionPaused(), true, 'still paused inside the band'); + assertEqual(s.evaluateProductionPause(), true, 'still paused inside the band'); // Playhead at 36 → lookahead 4s ≤ low-water 5 → resume. cm.now = 36; assertEqual(s.getForwardLookaheadSeconds(), 4, 'lookahead below low-water'); - assertEqual(s.isProductionPaused(), false, 'resumes below low-water'); + assertEqual(s.evaluateProductionPause(), false, 'resumes below low-water'); // Refill back over high-water re-latches (the next chunk would re-pause). for (let i = 0; i < 20; i++) s.addBuffer(buf(1)); // +20s decoded ahead - assertEqual(s.isProductionPaused(), true, 're-latches when fill exceeds high-water again'); + assertEqual(s.evaluateProductionPause(), true, 're-latches when fill exceeds high-water again'); }); // OQ3 hard byte ceiling pauses production independent of the time window, and releases as soon as @@ -418,7 +418,7 @@ test('OQ3 byte ceiling pauses regardless of the time window', () => { if (s.getDecodedByteEstimate() <= perBuffer * 4) { throw new Error('test setup: byte estimate should exceed the cap'); } - assertEqual(s.isProductionPaused(), true, 'byte ceiling pauses even with a huge time window'); + assertEqual(s.evaluateProductionPause(), true, 'byte ceiling pauses even with a huge time window'); }); // clear() / clearForSeek() release the latch so a fresh stream/seek starts unthrottled (C2). @@ -428,17 +428,17 @@ test('clear and clearForSeek release the back-pressure latch (C2 latency parity) s.setForwardWindow(10, 5, 0); fillAndStart(s, cm, 40); cm.now = 0; - assertEqual(s.isProductionPaused(), true, 'latched'); + assertEqual(s.evaluateProductionPause(), true, 'latched'); s.clear(); // After clear there are no buffers, lookahead is 0, and the latch is reset → unpaused. - assertEqual(s.isProductionPaused(), false, 'clear resets the latch and empties fill'); + assertEqual(s.evaluateProductionPause(), false, 'clear resets the latch and empties fill'); fillAndStart(s, cm, 40); cm.now = 0; - assertEqual(s.isProductionPaused(), true, 'latched again after refill'); + assertEqual(s.evaluateProductionPause(), true, 'latched again after refill'); s.clearForSeek(); - assertEqual(s.isProductionPaused(), false, 'clearForSeek resets the latch'); + assertEqual(s.evaluateProductionPause(), false, 'clearForSeek resets the latch'); }); // --- run ------------------------------------------------------------------------------------- diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts index f0043fb..5379130 100644 --- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts +++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts @@ -46,9 +46,10 @@ const DEFAULT_BACK_RETAIN_SECONDS = 10; * * Provisional time-based defaults (OQ1 — 21.4 tunes them): * - HIGH (30 s): the most decoded lookahead we hold ahead of the playhead before throttling. - * Comfortably above the playback-start minimum (6 buffers ≈ a second or two), so C2 holds — - * first audio never waits on a throttle (the high-water is reached only well after playback - * is already running). + * Comfortably above the playback-start minimum (`AudioPlayer.minBuffersForPlayback = 6` + * buffers, each typically 0.06 – 1 s depending on format/chunk size; at most a few seconds + * even at the high end), so C2 holds — first audio never waits on a throttle (the high-water + * is reached only well after playback is already running). * - LOW (15 s): resume producing here. Kept generous so the forward fill never drains to the * ~500 ms scheduler lookahead under normal network jitter (AC3 — no starvation). * @@ -93,7 +94,7 @@ export class PlaybackScheduler { private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS; // Forward back-pressure water-marks + the OQ3 hard byte ceiling (Phase 21.2). This is the - // single shared window policy (OQ6): both producers read isProductionPaused() and honor it + // single shared window policy (OQ6): both producers call evaluateProductionPause() and honor it // in their own way — the C# read loop stops ReadAsync, the Opus feed stops demux/decode. private forwardHighWaterSeconds: number = DEFAULT_FORWARD_HIGH_WATER_SECONDS; private forwardLowWaterSeconds: number = DEFAULT_FORWARD_LOW_WATER_SECONDS; @@ -103,6 +104,7 @@ export class PlaybackScheduler { // stay paused until it drains below the low-water mark, so the two producers do not flap // on/off around a single threshold (and a paused producer does not resume for one chunk only // to re-pause immediately). False until first crossing; flips on the band edges. + // Mutated by evaluateProductionPause() — named to signal the state-advance on each call. private productionPaused_: boolean = false; // Callbacks @@ -215,8 +217,13 @@ export class PlaybackScheduler { * drained below the low-water mark AND the byte estimate is back under the ceiling. The * byte-ceiling test has no separate low-water band — it is the hard guard rail, so it releases * as soon as eviction brings the footprint back under the cap. + * + * Named `evaluateProductionPause` (not `isProductionPaused`) because each call may ADVANCE the + * hysteresis latch (`productionPaused_`), making it a state-advancing evaluation, not a pure + * read. `AudioPlayer.isProductionPaused()` is the pure-predicate wrapper exposed to callers + * outside the scheduler. */ - isProductionPaused(): boolean { + evaluateProductionPause(): boolean { const lookahead = this.getForwardLookaheadSeconds(); const overByteCeiling = this.maxDecodedBytes > 0 && this.getDecodedByteEstimate() > this.maxDecodedBytes;