diff --git a/DeepDrftPublic.Client/Services/AudioInteropService.cs b/DeepDrftPublic.Client/Services/AudioInteropService.cs
index 966fd99..d463aa3 100644
--- a/DeepDrftPublic.Client/Services/AudioInteropService.cs
+++ b/DeepDrftPublic.Client/Services/AudioInteropService.cs
@@ -159,6 +159,25 @@ public class AudioInteropService : IAsyncDisposable
}
}
+ ///
+ /// Phase 21.2a back-pressure poll: ask whether the scheduler is still over its forward
+ /// high/low-water band. The read loop calls this only WHILE already throttled, to learn when it
+ /// may resume reading — the steady-state loop reads the piggybacked ProductionPaused flag
+ /// off each chunk result instead. Defaults to false on any interop failure so a torn-down player
+ /// never wedges a loop that is exiting anyway.
+ ///
+ public async Task IsProductionPaused(string playerId)
+ {
+ try
+ {
+ return await _jsRuntime.InvokeAsync("DeepDrftAudio.isProductionPaused", playerId);
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
public async Task ReinitializeFromOffset(string playerId, long totalStreamLength, double seekPosition)
{
return await InvokeJsAsync("DeepDrftAudio.reinitializeFromOffset", playerId, totalStreamLength, seekPosition);
@@ -419,6 +438,11 @@ public class StreamingResult : AudioOperationResult
public bool HeaderParsed { get; set; }
public int BufferCount { get; set; }
public double? Duration { get; set; } // Duration in seconds calculated from WAV header
+
+ // Phase 21.2a back-pressure: true when the scheduler's forward decoded fill is over the
+ // high-water mark and the C# read loop should stop calling ReadAsync until it drains. Read off
+ // the chunk result the loop already awaits — no extra interop hop in the unthrottled steady state.
+ public bool ProductionPaused { get; set; }
}
public class AudioPlayerState
diff --git a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs
index 9b6f856..71346b0 100644
--- a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs
+++ b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs
@@ -16,6 +16,14 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
// Adaptive chunk sizing
private const int MinBufferSize = 16 * 1024; // 16KB minimum
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
+
+ // Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water
+ // mark, the read loop stops calling ReadAsync and polls IsProductionPaused at this cadence
+ // until the fill drains below low-water. 100 ms is well under the low-water lookahead (seconds),
+ // so resume is prompt relative to the playhead — no starvation (AC3) — while keeping the poll
+ // cheap. The poll honors the loop's cancellation token, so a track switch/seek during a pause
+ // exits through the same drain discipline as a pause during ReadAsync (C6).
+ private const int BackpressurePollMs = 100;
private int _currentBufferSize = DefaultBufferSize;
private int _consecutiveSlowReads = 0;
@@ -465,6 +473,26 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
}
await ThrottledNotifyStateChanged();
+
+ // Phase 21.2a back-pressure (serves BOTH paths). The chunk we just processed
+ // reported the scheduler's forward fill is over the high-water mark — stop
+ // reading the socket so the unplayed decoded region stays bounded. Pausing
+ // ReadAsync lets the kernel TCP window close (we are working WITH transport flow
+ // control, not against it). Poll until the fill drains below low-water, then
+ // resume the loop. For WAV this is the whole story (StreamDecoder decodes
+ // synchronously into the scheduler); the Opus feed additionally self-throttles
+ // its demux/decode off the SAME signal (21.2b), so its upstream queues stay
+ // near-empty too. The poll awaits on cancellationToken, so a track switch/seek
+ // mid-pause throws OCE and unwinds through the existing drain discipline (C6) —
+ // no separate cancellation path, no stale read racing a reinit.
+ if (chunkResult.ProductionPaused)
+ {
+ while (await _audioInterop.IsProductionPaused(PlayerId))
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ await Task.Delay(BackpressurePollMs, cancellationToken);
+ }
+ }
}
} while (currentBytes > 0);
diff --git a/DeepDrftPublic/Interop/audio/AudioPlayer.ts b/DeepDrftPublic/Interop/audio/AudioPlayer.ts
index 06ce602..9e6f2e1 100644
--- a/DeepDrftPublic/Interop/audio/AudioPlayer.ts
+++ b/DeepDrftPublic/Interop/audio/AudioPlayer.ts
@@ -30,6 +30,10 @@ export interface StreamingResult extends AudioResult {
headerParsed?: boolean;
bufferCount?: number;
duration?: number;
+ // Phase 21.2a back-pressure signal piggybacked on the chunk result the C# read loop already
+ // awaits — true means the scheduler's forward fill is over the high-water mark and the loop
+ // should stop calling ReadAsync until it drains (no extra interop hop in the common case).
+ productionPaused?: boolean;
}
export interface AudioState {
@@ -133,7 +137,14 @@ export class AudioPlayer {
// selects Opus when the sidecar parsed, so the null branch is defensive.
if (this.isOpusContentType(contentType) && this.pendingOpusSidecar) {
this.activeOpusSidecar = this.pendingOpusSidecar;
- this.opusDecoder = new OpusStreamDecoder(this.contextManager, this.pendingOpusSidecar);
+ // Pass the shared back-pressure signal (21.2b): the Opus decoder stops demuxing/
+ // decoding new packets while the scheduler is full, so the WebCodecs decode queue
+ // and decodedQueue do not balloon behind a throttled socket (OQ7). Same signal the
+ // C# read loop honors — one policy, two thin hooks.
+ this.opusDecoder = new OpusStreamDecoder(
+ this.contextManager,
+ this.pendingOpusSidecar,
+ () => this.scheduler.isProductionPaused());
return { success: true };
}
@@ -263,7 +274,8 @@ export class AudioPlayer {
canStartStreaming: canStart,
headerParsed,
bufferCount: this.scheduler.getBufferCount(),
- duration: this.duration
+ duration: this.duration,
+ productionPaused: this.scheduler.isProductionPaused()
};
} catch (error) {
return { success: false, error: (error as Error).message };
@@ -307,7 +319,8 @@ export class AudioPlayer {
canStartStreaming: canStart,
headerParsed: this.streamDecoder.headerParsed,
bufferCount: this.scheduler.getBufferCount(),
- duration: this.duration
+ duration: this.duration,
+ productionPaused: this.scheduler.isProductionPaused()
};
} catch (error) {
return { success: false, error: (error as Error).message };
@@ -508,6 +521,17 @@ export class AudioPlayer {
return this.scheduler.getTotalDuration() + this.scheduler.getPlaybackOffset();
}
+ /**
+ * The shared back-pressure signal (Phase 21.2a), polled by the C# read loop WHILE it is
+ * already throttled to learn when the forward fill has drained below the low-water mark and it
+ * may resume reading. The steady-state (unthrottled) loop never calls this — it reads the
+ * piggybacked productionPaused flag off each chunk result instead, so there is no extra
+ * interop hop until back-pressure actually engages.
+ */
+ isProductionPaused(): boolean {
+ return this.scheduler.isProductionPaused();
+ }
+
/**
* Calculate byte offset for a time position (for C# layer)
*/
diff --git a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts
index 44f716b..da9d72e 100644
--- a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts
+++ b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.test.ts
@@ -448,6 +448,65 @@ test('OpusStreamDecoder.totalDuration is null when the sidecar carries no positi
assertEqual(decoder.totalDuration, null, 'no positive duration -> null');
});
+// --- Phase 21.2b: Opus decode-ahead back-pressure (the stash-while-full half) ------------------
+//
+// When the shared scheduler is full, push() must NOT demux/decode ahead — it stashes the raw bytes
+// and returns nothing, so the WebCodecs decode queue and decodedQueue stay near-empty (OQ7). The
+// stash-while-full branch returns BEFORE ensureConfigured(), so it is testable without WebCodecs
+// (no AudioDecoder is constructed). The drain-on-resume path needs the real WebCodecs decoder and
+// stays browser-verified; here we pin the bound itself and the lifecycle resets.
+
+// Access the private stash for white-box assertions (same idiom the scheduler tests use).
+function stashLength(decoder: OpusStreamDecoder): number {
+ return (decoder as unknown as { pendingBytes: Uint8Array[] }).pendingBytes.length;
+}
+
+// The stash-while-full branch returns synchronously at the top of push() (before any real await),
+// so the stash is observable immediately without awaiting the returned promise — keeping these
+// tests inside the synchronous inline harness (which does not await test bodies).
+test('push stashes bytes and decodes nothing while the scheduler is full (no decode-ahead)', () => {
+ const sidecar = sidecarFrom({
+ setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64],
+ totalByteLength: 500_000, totalDuration: 100, preSkip: 312,
+ points: [{ granule: 312, byteOffset: 4096 }],
+ });
+ // Scheduler reports "full" → push must short-circuit before touching WebCodecs.
+ const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true);
+
+ void decoder.push(new Uint8Array([1, 2, 3]));
+ void decoder.push(new Uint8Array([4, 5]));
+
+ assertEqual(stashLength(decoder), 2, 'both chunks stashed in arrival order');
+ assertEqual(decoder.ready, false, 'decoder not even configured while throttled');
+});
+
+test('reinitializeForRangeContinuation drops the pre-seek stash (C6 — no stale feed across reset)', () => {
+ const sidecar = sidecarFrom({
+ setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64],
+ totalByteLength: 500_000, totalDuration: 100, preSkip: 312,
+ points: [{ granule: 312, byteOffset: 4096 }],
+ });
+ const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true);
+ void decoder.push(new Uint8Array([1, 2, 3])); // stash one chunk while full
+ assertEqual(stashLength(decoder), 1, 'one chunk stashed pre-seek');
+
+ decoder.reinitializeForRangeContinuation(0, 5); // a seek
+ assertEqual(stashLength(decoder), 0, 'pre-seek stash dropped on range-continuation');
+});
+
+test('dispose clears the stash', () => {
+ const sidecar = sidecarFrom({
+ setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64],
+ totalByteLength: 500_000, totalDuration: 100, preSkip: 312,
+ points: [{ granule: 312, byteOffset: 4096 }],
+ });
+ const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true);
+ void decoder.push(new Uint8Array([9]));
+ assertEqual(stashLength(decoder), 1, 'stashed');
+ decoder.dispose();
+ assertEqual(stashLength(decoder), 0, 'stash cleared on dispose');
+});
+
function concat(arrs: Uint8Array[]): Uint8Array {
let len = 0;
for (const a of arrs) len += a.length;
diff --git a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts
index 01c61f2..a3db462 100644
--- a/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts
+++ b/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts
@@ -39,6 +39,17 @@ const MAX_PACKET_FRAMES = 5760;
export class OpusStreamDecoder implements IStreamingDecoder {
private readonly contextManager: AudioContextManager;
private readonly sidecar: OpusSeekData;
+ // Phase 21.2b back-pressure hook: returns true when the shared scheduler is full (forward fill
+ // over high-water). While full, push() stashes raw bytes WITHOUT demuxing/decoding so the
+ // WebCodecs decode queue and decodedQueue stay near-empty behind a throttled socket (OQ7).
+ // Null = no back-pressure (e.g. unit tests), in which case the decoder feeds eagerly as before.
+ private readonly isSchedulerFull: (() => boolean) | null;
+
+ // Raw bytes received while the scheduler was full, held undemuxed until it drains. The C# read
+ // loop also pauses above high-water, so this stash is bounded to at most the in-flight chunks
+ // between the loop reading the productionPaused flag and actually stopping — a handful of KB,
+ // not a decode-ahead. Drained (demuxed + decoded) on the next push once below high-water.
+ private pendingBytes: Uint8Array[] = [];
private demuxer = new OggDemuxer();
private decoder: AudioDecoder | null = null;
@@ -66,9 +77,13 @@ export class OpusStreamDecoder implements IStreamingDecoder {
private emittedFrames = 0;
private readonly totalFrames: number;
- constructor(contextManager: AudioContextManager, sidecar: OpusSeekData) {
+ constructor(
+ contextManager: AudioContextManager,
+ sidecar: OpusSeekData,
+ isSchedulerFull: (() => boolean) | null = null) {
this.contextManager = contextManager;
this.sidecar = sidecar;
+ this.isSchedulerFull = isSchedulerFull;
this.totalFrames = sidecar.totalDurationSeconds > 0
? Math.round(sidecar.totalDurationSeconds * OPUS_SAMPLE_RATE)
: Number.POSITIVE_INFINITY;
@@ -136,17 +151,59 @@ export class OpusStreamDecoder implements IStreamingDecoder {
async push(chunk: Uint8Array): Promise {
if (this.fatalError) return [];
+
+ // 21.2b back-pressure: while the scheduler is full, do NOT demux/decode ahead. Stash the
+ // raw bytes in arrival order and return nothing — the WebCodecs decode queue and
+ // decodedQueue stay near-empty (OQ7). The bytes are demuxed/decoded on a later push once
+ // the scheduler has drained below low-water, in exactly the order received (Ogg demux is
+ // order-sensitive). configure() is deferred too — no need to spin up the decoder while
+ // throttled. The C# loop also stops reading above high-water, so the stash stays small.
+ if (this.isSchedulerFull?.()) {
+ this.pendingBytes.push(chunk);
+ return [];
+ }
+
if (!(await this.ensureConfigured())) return [];
- const packets = this.demuxer.push(chunk);
- this.decodePackets(packets);
+ // Drained below high-water: replay any stashed bytes first (preserving stream order), then
+ // the new chunk, through the demuxer as one contiguous feed.
+ const out: AudioBuffer[] = [];
+ if (this.pendingBytes.length > 0) {
+ const stashed = this.pendingBytes;
+ this.pendingBytes = [];
+ for (const bytes of stashed) {
+ this.decodePackets(this.demuxer.push(bytes));
+ }
+ }
+
+ this.decodePackets(this.demuxer.push(chunk));
// Wait until the WebCodecs decoder has processed the queued packets before draining.
await this.yieldToDecoder();
- return this.drainDecoded();
+ out.push(...this.drainDecoded());
+ return out;
}
async complete(): Promise {
- if (this.fatalError || !this.decoder || this.decoder.state !== 'configured') {
+ if (this.fatalError) {
+ return this.drainDecoded();
+ }
+
+ // End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track
+ // that finished sending before the scheduler drained). Configure if needed and replay the
+ // stash so the tail is decoded before flush — otherwise the final seconds would be lost.
+ if (this.pendingBytes.length > 0) {
+ if (await this.ensureConfigured()) {
+ const stashed = this.pendingBytes;
+ this.pendingBytes = [];
+ for (const bytes of stashed) {
+ this.decodePackets(this.demuxer.push(bytes));
+ }
+ } else {
+ this.pendingBytes = [];
+ }
+ }
+
+ if (!this.decoder || this.decoder.state !== 'configured') {
return this.drainDecoded();
}
try {
@@ -184,6 +241,10 @@ export class OpusStreamDecoder implements IStreamingDecoder {
// continuation mode (treat the first page's packets as audio — no setup pages in a 206 body).
this.demuxer.reset(true);
this.decodedQueue = [];
+ // Drop any bytes stashed by back-pressure: they belong to the PRE-seek stream position and
+ // must never be replayed against the post-seek (range-continuation) demux state (C6 — no
+ // stale feed racing the reset).
+ this.pendingBytes = [];
this.emittedFrames = 0; // post-seek buffers are positioned by the scheduler's playbackOffset
// Arm the lead trim: skip enough decoded frames to land at targetTimeSeconds, not at
// landingTimeSeconds (the page start). Clamp to ≥0 to guard against floating-point rounding.
@@ -199,6 +260,7 @@ export class OpusStreamDecoder implements IStreamingDecoder {
try { d.close(); } catch { /* already closed */ }
}
this.decodedQueue = [];
+ this.pendingBytes = [];
if (this.decoder && this.decoder.state !== 'closed') {
try { this.decoder.close(); } catch { /* already closed */ }
}
diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts
index ec75d0e..8395147 100644
--- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts
+++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.test.ts
@@ -85,6 +85,14 @@ function buf(duration: number): AudioBuffer {
return { duration } as AudioBuffer;
}
+/**
+ * A decoded buffer carrying realistic byte-footprint fields (length + numberOfChannels) for the
+ * OQ3 byte-ceiling test. Models 48 kHz stereo float PCM: length = duration × 48000 frames, 2 ch.
+ */
+function bufBytes(duration: number): AudioBuffer {
+ return { duration, length: Math.round(duration * 48000), numberOfChannels: 2 } as AudioBuffer;
+}
+
function makeScheduler(cm: FakeContextManager): PlaybackScheduler {
// The scheduler only uses the subset FakeContextManager implements.
return new PlaybackScheduler(cm as unknown as AudioContextManager);
@@ -325,6 +333,114 @@ test('eviction via handleSourceEnded: position exact, live bufferIndex decrement
}
});
+// === Phase 21.2 back-pressure: the forward water-mark signal =================================
+//
+// The signal is pure given the clock + buffer durations + the playhead position, so it is
+// testable in Node with the same fakes. We drive forward lookahead by adding buffers (fill) and
+// advancing the clock (drain), and assert the hysteresis latch and the OQ3 byte ceiling.
+
+/**
+ * Fill the scheduler with `count` 1 s buffers, start playback at t=0, and advance the schedule
+ * cursor to the end so nextBufferIndex does not pin anything. Leaves all `count` buffers decoded
+ * and the playhead at the clock position the caller sets afterwards.
+ */
+function fillAndStart(s: PlaybackScheduler, cm: FakeContextManager, count: number): void {
+ for (let i = 0; i < count; i++) s.addBuffer(buf(1));
+ cm.now = 0;
+ s.playFromPosition(0);
+ advanceCursorToEnd(s, cm);
+}
+
+// High-water reached → production pauses; the signal reflects the forward lookahead.
+test('isProductionPaused latches true when forward lookahead reaches high-water', () => {
+ const cm = new FakeContextManager();
+ const s = makeScheduler(cm);
+ s.setForwardWindow(10, 5, 0); // high 10s, low 5s, byte cap disabled
+ fillAndStart(s, cm, 40); // 40s decoded, track [0,40)
+
+ cm.now = 0; // playhead at 0 → forward lookahead = 40s ≥ 10s high-water
+ assertEqual(s.getForwardLookaheadSeconds(), 40, 'lookahead is full decoded tail at t=0');
+ assertEqual(s.isProductionPaused(), true, 'pauses at/above high-water');
+});
+
+// Below high-water but above low-water while NOT yet paused → stays unpaused (no premature pause).
+test('isProductionPaused stays false in the hysteresis band before the high-water crossing', () => {
+ const cm = new FakeContextManager();
+ const s = makeScheduler(cm);
+ s.setForwardWindow(10, 5, 0);
+ fillAndStart(s, cm, 8); // 8s decoded
+
+ cm.now = 0; // lookahead 8s: between low(5) and high(10), never latched → unpaused
+ assertEqual(s.isProductionPaused(), false, 'no pause until high-water is actually reached');
+});
+
+// Hysteresis: once paused at high-water, stays paused through the band until lookahead drains
+// below low-water, then resumes. Drain is modeled by advancing the clock (playhead moves forward,
+// shrinking forward lookahead).
+test('isProductionPaused holds through the band and resumes only below low-water', () => {
+ const cm = new FakeContextManager();
+ const s = makeScheduler(cm);
+ s.setForwardWindow(10, 5, 0);
+ fillAndStart(s, cm, 40); // track [0,40)
+
+ cm.now = 0;
+ assertEqual(s.isProductionPaused(), true, 'latched at high-water (40s ahead)');
+
+ // Playhead at 32 → lookahead 8s: in the band (5..10) → must STAY paused (hysteresis).
+ cm.now = 32;
+ assertEqual(s.getForwardLookaheadSeconds(), 8, 'lookahead drained to 8s');
+ assertEqual(s.isProductionPaused(), true, 'still paused inside the band');
+
+ // Playhead at 36 → lookahead 4s ≤ low-water 5 → resume.
+ cm.now = 36;
+ assertEqual(s.getForwardLookaheadSeconds(), 4, 'lookahead below low-water');
+ assertEqual(s.isProductionPaused(), false, 'resumes below low-water');
+
+ // Refill back over high-water re-latches (the next chunk would re-pause).
+ for (let i = 0; i < 20; i++) s.addBuffer(buf(1)); // +20s decoded ahead
+ assertEqual(s.isProductionPaused(), true, 're-latches when fill exceeds high-water again');
+});
+
+// OQ3 hard byte ceiling pauses production independent of the time window, and releases as soon as
+// the footprint is back under the cap (no separate low-water band on the byte guard).
+test('OQ3 byte ceiling pauses regardless of the time window', () => {
+ const cm = new FakeContextManager();
+ const s = makeScheduler(cm);
+ // Each 1s buffer here is 48000 frames × 2 ch × 4 bytes = 384000 bytes. Cap at ~1.5 MB ≈ 4 buffers.
+ const perBuffer = 48000 * 2 * 4;
+ s.setForwardWindow(1000, 500, perBuffer * 4); // time window huge so only the byte cap can fire
+ for (let i = 0; i < 6; i++) s.addBuffer(bufBytes(1)); // 6 buffers > 4-buffer cap
+ cm.now = 0;
+ s.playFromPosition(0);
+ advanceCursorToEnd(s, cm);
+
+ cm.now = 0;
+ if (s.getDecodedByteEstimate() <= perBuffer * 4) {
+ throw new Error('test setup: byte estimate should exceed the cap');
+ }
+ assertEqual(s.isProductionPaused(), true, 'byte ceiling pauses even with a huge time window');
+});
+
+// clear() / clearForSeek() release the latch so a fresh stream/seek starts unthrottled (C2).
+test('clear and clearForSeek release the back-pressure latch (C2 latency parity)', () => {
+ const cm = new FakeContextManager();
+ const s = makeScheduler(cm);
+ s.setForwardWindow(10, 5, 0);
+ fillAndStart(s, cm, 40);
+ cm.now = 0;
+ assertEqual(s.isProductionPaused(), true, 'latched');
+
+ s.clear();
+ // After clear there are no buffers, lookahead is 0, and the latch is reset → unpaused.
+ assertEqual(s.isProductionPaused(), false, 'clear resets the latch and empties fill');
+
+ fillAndStart(s, cm, 40);
+ cm.now = 0;
+ assertEqual(s.isProductionPaused(), true, 'latched again after refill');
+ s.clearForSeek();
+ assertEqual(s.isProductionPaused(), false, 'clearForSeek resets the latch');
+});
+
// --- run -------------------------------------------------------------------------------------
if (failures.length > 0) {
console.error(failures.join('\n'));
diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts
index 975dc45..f0043fb 100644
--- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts
+++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts
@@ -36,6 +36,32 @@ import { AudioContextManager } from './AudioContextManager.js';
*/
const DEFAULT_BACK_RETAIN_SECONDS = 10;
+/**
+ * Forward back-pressure water-marks (Phase 21.2 — the bound on the *unplayed* region).
+ *
+ * The single back-pressure signal is the scheduler's decoded forward lookahead: how many
+ * seconds of decoded audio sit AHEAD of the playhead (OQ7). Production (the C# read loop and,
+ * for Opus, the demux/decode feed) pauses above the high-water mark and resumes below the
+ * low-water mark — classic hysteresis so the two producers do not chatter on/off per chunk.
+ *
+ * Provisional time-based defaults (OQ1 — 21.4 tunes them):
+ * - HIGH (30 s): the most decoded lookahead we hold ahead of the playhead before throttling.
+ * Comfortably above the playback-start minimum (6 buffers ≈ a second or two), so C2 holds —
+ * first audio never waits on a throttle (the high-water is reached only well after playback
+ * is already running).
+ * - LOW (15 s): resume producing here. Kept generous so the forward fill never drains to the
+ * ~500 ms scheduler lookahead under normal network jitter (AC3 — no starvation).
+ *
+ * OQ3 hard memory ceiling: an absolute byte cap on total decoded float held, independent of the
+ * time window. This is the guard-rail that makes "1 GB never OOMs" a guarantee rather than a
+ * tuning hope — if a pathological stream packs an unusual amount of decoded audio into the time
+ * window, the byte cap still pauses production. Estimated as channels × frames × 4 bytes (f32).
+ */
+const DEFAULT_FORWARD_HIGH_WATER_SECONDS = 30;
+const DEFAULT_FORWARD_LOW_WATER_SECONDS = 15;
+const DEFAULT_MAX_DECODED_BYTES = 96 * 1024 * 1024; // ~96 MB of decoded float PCM
+const BYTES_PER_FLOAT_SAMPLE = 4;
+
interface ScheduledSource {
source: AudioBufferSourceNode;
bufferIndex: number;
@@ -66,6 +92,19 @@ export class PlaybackScheduler {
// 21.2 will drive this from the window policy. Not a hardcoded eviction decision.
private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS;
+ // Forward back-pressure water-marks + the OQ3 hard byte ceiling (Phase 21.2). This is the
+ // single shared window policy (OQ6): both producers read isProductionPaused() and honor it
+ // in their own way — the C# read loop stops ReadAsync, the Opus feed stops demux/decode.
+ private forwardHighWaterSeconds: number = DEFAULT_FORWARD_HIGH_WATER_SECONDS;
+ private forwardLowWaterSeconds: number = DEFAULT_FORWARD_LOW_WATER_SECONDS;
+ private maxDecodedBytes: number = DEFAULT_MAX_DECODED_BYTES;
+
+ // Hysteresis latch for the production pause. Once forward fill crosses the high-water mark we
+ // stay paused until it drains below the low-water mark, so the two producers do not flap
+ // on/off around a single threshold (and a paused producer does not resume for one chunk only
+ // to re-pause immediately). False until first crossing; flips on the band edges.
+ private productionPaused_: boolean = false;
+
// Callbacks
public onPlaybackEnded: (() => void) | null = null;
@@ -132,6 +171,68 @@ export class PlaybackScheduler {
this.backRetainSeconds = Math.max(0, seconds);
}
+ /**
+ * Configure the forward back-pressure water-marks (seconds of decoded lookahead) and the OQ3
+ * hard byte ceiling. Provisional config seam — 21.4 tunes the numbers. Low is clamped below
+ * high so the hysteresis band is always valid; non-positive byte cap disables the OQ3 guard.
+ */
+ setForwardWindow(highWaterSeconds: number, lowWaterSeconds: number, maxDecodedBytes: number): void {
+ this.forwardHighWaterSeconds = Math.max(0, highWaterSeconds);
+ this.forwardLowWaterSeconds = Math.max(0, Math.min(lowWaterSeconds, this.forwardHighWaterSeconds));
+ this.maxDecodedBytes = maxDecodedBytes;
+ }
+
+ /**
+ * Seconds of decoded audio sitting AHEAD of the current playhead — the forward fill. This is
+ * the single back-pressure signal (OQ7): the absolute end time of the last decoded buffer
+ * minus the current playback position. Never negative (clamped at 0 when the playhead has
+ * caught up to or passed the decoded tail).
+ */
+ getForwardLookaheadSeconds(): number {
+ const decodedEnd = this.getTotalDuration() + this.playbackOffset;
+ return Math.max(0, decodedEnd - this.getCurrentPosition());
+ }
+
+ /**
+ * Estimated bytes of decoded float PCM currently retained (OQ3 input). Web Audio AudioBuffers
+ * are 32-bit float per sample per channel; frames = duration × sampleRate. Summed across the
+ * retained buffers only — evicted buffers are already reclaimed, so this tracks the live
+ * footprint, not the whole track.
+ */
+ getDecodedByteEstimate(): number {
+ let bytes = 0;
+ for (const b of this.buffers) {
+ bytes += b.length * b.numberOfChannels * BYTES_PER_FLOAT_SAMPLE;
+ }
+ return bytes;
+ }
+
+ /**
+ * The single shared production-pause decision (Phase 21.2, OQ6/OQ7). Both producers — the C#
+ * read loop (21.2a) and the Opus demux/decode feed (21.2b) — call this and stop producing
+ * while it returns true. Hysteresis: pause when forward lookahead exceeds the high-water mark
+ * OR the decoded byte estimate exceeds the OQ3 ceiling; resume only once forward lookahead has
+ * drained below the low-water mark AND the byte estimate is back under the ceiling. The
+ * byte-ceiling test has no separate low-water band — it is the hard guard rail, so it releases
+ * as soon as eviction brings the footprint back under the cap.
+ */
+ isProductionPaused(): boolean {
+ const lookahead = this.getForwardLookaheadSeconds();
+ const overByteCeiling = this.maxDecodedBytes > 0 && this.getDecodedByteEstimate() > this.maxDecodedBytes;
+
+ if (this.productionPaused_) {
+ // Stay paused until BOTH the time window has drained below low-water AND the byte
+ // footprint is back under the ceiling.
+ if (lookahead <= this.forwardLowWaterSeconds && !overByteCeiling) {
+ this.productionPaused_ = false;
+ }
+ } else if (lookahead >= this.forwardHighWaterSeconds || overByteCeiling) {
+ this.productionPaused_ = true;
+ }
+
+ return this.productionPaused_;
+ }
+
/**
* Drop already-played buffers from the front of the array, reclaiming their decoded float
* memory, and advance the time anchor so all position/index bookkeeping stays exact.
@@ -418,6 +519,9 @@ export class PlaybackScheduler {
this.nextBufferIndex = 0;
this.nextScheduleTime = 0;
this.playbackOffset = 0;
+ // Release the back-pressure latch — a fresh stream must start unthrottled so its first
+ // chunks decode immediately (C2: no throttle-induced first-audio stall).
+ this.productionPaused_ = false;
}
/**
@@ -432,6 +536,9 @@ export class PlaybackScheduler {
this.nextBufferIndex = 0;
this.nextScheduleTime = 0;
// Note: playbackOffset is NOT reset - it will be set by the caller
+ // Release the back-pressure latch — the post-seek continuation must refill from the new
+ // offset without inheriting the pre-seek paused state.
+ this.productionPaused_ = false;
}
/**
diff --git a/DeepDrftPublic/Interop/audio/index.ts b/DeepDrftPublic/Interop/audio/index.ts
index dd863b9..136b329 100644
--- a/DeepDrftPublic/Interop/audio/index.ts
+++ b/DeepDrftPublic/Interop/audio/index.ts
@@ -117,6 +117,14 @@ const DeepDrftAudio = {
return player?.calculateByteOffset(positionSeconds) ?? 0;
},
+ // Phase 21.2a back-pressure poll: the C# read loop calls this WHILE throttled to learn when
+ // the scheduler has drained below low-water and reading may resume. A missing player reads as
+ // "not paused" so a torn-down player never wedges a loop that is already exiting.
+ isProductionPaused: (playerId: string): boolean => {
+ const player = audioPlayers.get(playerId);
+ return player?.isProductionPaused() ?? false;
+ },
+
reinitializeFromOffset: (playerId: string, totalStreamLength: number, seekPosition: number): AudioResult => {
const player = audioPlayers.get(playerId);
if (!player) return { success: false, error: 'Player not found' };