Files
deepdrft/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts
T
daniel-c-harvey 518479e7ae Phase 21.2: back-pressure to bound the unplayed decoded region
Shared scheduler fill signal (forward water-marks + hard byte cap) pauses
the C# read loop above high-water and, for Opus, stops the demux/decode
feed so WebCodecs queues stay near-empty. Routes through the existing
cancellation discipline; releases the latch on clear/seek.
2026-06-23 23:16:08 -04:00

378 lines
18 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* OpusStreamDecoder - the WebCodecs streaming Opus decode pipeline.
*
* This replaces the fundamentally-broken per-segment `decodeAudioData` Opus model. Instead of cutting
* the Ogg stream into page-runs and decoding each as a standalone file (which re-applies pre-skip and
* starts from cold codec state at every boundary), it feeds a single stateful WebCodecs `AudioDecoder`
* the demuxed Opus packets in order — correct pre-skip-once handling and full inter-frame continuity.
*
* Pipeline: OggDemuxer (pages -> Opus packets + granule) -> AudioDecoder (codec 'opus', configured
* from the OpusHead in the sidecar) -> AudioData (48 kHz PCM) -> AudioBuffer -> PlaybackScheduler.
*
* Pre-skip (encoder delay): handled ONCE, by the decoder. WebCodecs decodes Opus with the OpusHead
* passed as `AudioDecoderConfig.description`; the OpusHead carries `pre_skip`, and the WebCodecs Opus
* decoder discards those leading samples itself. We do NOT re-trim per packet — doing so on top of the
* decoder's own trim would double-count. This is the spec-intended path (W3C WebCodecs Opus registration).
*
* End-trim: the sidecar's `totalDurationSeconds` is the exact pre-skip-corrected stream length. We cap
* cumulative emitted audio at that length so the final partial frame's padding does not leak past the
* true end. (Granule-position end-trim from the EOS page is the alternative; capping on the known total
* is equivalent and simpler, and the sidecar total is authoritative.)
*
* Sample rate: Opus always decodes at 48 kHz (RFC 7845). We force the AudioContext to 48 kHz at init so
* the decoded AudioData needs no resampling before scheduling — the same `recreateWithSampleRate` seam
* the WAV path uses for non-44.1 sources.
*
* BROWSER-VERIFIED. The actual decode/playback/trim correctness is verified in Daniel's browser
* (WebCodecs cannot run in Node/headless here). The Ogg demux, packet timing, and end-trim *math* are
* unit-tested; the WebCodecs glue (configure/decode/flush/AudioData->AudioBuffer) is browser-verified.
*/
import { AudioContextManager } from './AudioContextManager.js';
import { IStreamingDecoder } from './IStreamingDecoder.js';
import { OggDemuxer, OpusPacket, extractOpusHead, opusHeadChannelCount } from './OggDemuxer.js';
import { OpusSeekData, OPUS_SAMPLE_RATE } from './OpusSidecar.js';
/** Opus packet duration ceiling is 120 ms; at 48 kHz that is 5760 frames. Used only for chunk timestamps. */
const MAX_PACKET_FRAMES = 5760;
export class OpusStreamDecoder implements IStreamingDecoder {
private readonly contextManager: AudioContextManager;
private readonly sidecar: OpusSeekData;
// Phase 21.2b back-pressure hook: returns true when the shared scheduler is full (forward fill
// over high-water). While full, push() stashes raw bytes WITHOUT demuxing/decoding so the
// WebCodecs decode queue and decodedQueue stay near-empty behind a throttled socket (OQ7).
// Null = no back-pressure (e.g. unit tests), in which case the decoder feeds eagerly as before.
private readonly isSchedulerFull: (() => boolean) | null;
// Raw bytes received while the scheduler was full, held undemuxed until it drains. The C# read
// loop also pauses above high-water, so this stash is bounded to at most the in-flight chunks
// between the loop reading the productionPaused flag and actually stopping — a handful of KB,
// not a decode-ahead. Drained (demuxed + decoded) on the next push once below high-water.
private pendingBytes: Uint8Array[] = [];
private demuxer = new OggDemuxer();
private decoder: AudioDecoder | null = null;
private channelCount = 2;
private configured = false;
// OpusHead bytes used as the AudioDecoder `description`, captured once at first configure and reused
// verbatim on a range-continuation reconfigure (avoids re-extracting / a non-null assertion).
private opusHeadDescription: Uint8Array | null = null;
// Decoded AudioData awaiting conversion, filled by the AudioDecoder output callback.
private decodedQueue: AudioData[] = [];
private fatalError = false;
// Frames to discard from the head of the first post-seek decoded output (AC9 fine re-sync).
// Set by reinitializeForRangeContinuation to (targetTimeSeconds - landingTimeSeconds) * 48000,
// consumed frame-by-frame in audioDataToBuffer until exhausted (then zero for the rest of the stream).
private leadTrimFrames = 0;
// Monotonic packet timestamp (microseconds) handed to each EncodedAudioChunk. WebCodecs requires
// strictly increasing timestamps; the true value is irrelevant to us (we schedule by accumulation),
// so a synthetic 48 kHz-derived counter suffices and stays exact.
private nextTimestampUs = 0;
// Cumulative frames already emitted as AudioBuffers, for end-trim against the known total length.
private emittedFrames = 0;
private readonly totalFrames: number;
constructor(
contextManager: AudioContextManager,
sidecar: OpusSeekData,
isSchedulerFull: (() => boolean) | null = null) {
this.contextManager = contextManager;
this.sidecar = sidecar;
this.isSchedulerFull = isSchedulerFull;
this.totalFrames = sidecar.totalDurationSeconds > 0
? Math.round(sidecar.totalDurationSeconds * OPUS_SAMPLE_RATE)
: Number.POSITIVE_INFINITY;
}
get hasFatalError(): boolean {
return this.fatalError;
}
get ready(): boolean {
return this.configured;
}
get totalDuration(): number | null {
return this.sidecar.totalDurationSeconds > 0 ? this.sidecar.totalDurationSeconds : null;
}
/**
* Lazily build + configure the WebCodecs decoder from the sidecar's OpusHead. Idempotent. Forces the
* AudioContext to 48 kHz so decoded AudioData schedules without resampling. Returns false on a config
* the browser cannot support (caller should never reach here — the capability gate runs first — but
* we fail safe rather than throw into the stream loop).
*/
private async ensureConfigured(): Promise<boolean> {
if (this.configured) return true;
if (typeof AudioDecoder === 'undefined') {
this.fatalError = true;
return false;
}
const opusHead = extractOpusHead(this.sidecar.setupHeaderBytes);
if (!opusHead) {
this.fatalError = true;
return false;
}
this.channelCount = opusHeadChannelCount(opusHead);
// Copy the OpusHead into a standalone buffer — the sidecar subarray is a view we keep.
this.opusHeadDescription = opusHead.slice();
// Opus decodes at 48 kHz; align the context so no resample is needed.
if (this.contextManager.sampleRate !== OPUS_SAMPLE_RATE) {
await this.contextManager.recreateWithSampleRate(OPUS_SAMPLE_RATE);
}
this.decoder = new AudioDecoder({
output: (data) => this.decodedQueue.push(data),
error: (err) => {
console.error('Opus AudioDecoder error:', err.message);
this.fatalError = true;
}
});
this.decoder.configure(this.buildConfig());
this.configured = true;
return true;
}
private buildConfig(): AudioDecoderConfig {
return {
codec: 'opus',
sampleRate: OPUS_SAMPLE_RATE,
numberOfChannels: this.channelCount,
description: this.opusHeadDescription ?? undefined
};
}
async push(chunk: Uint8Array): Promise<AudioBuffer[]> {
if (this.fatalError) return [];
// 21.2b back-pressure: while the scheduler is full, do NOT demux/decode ahead. Stash the
// raw bytes in arrival order and return nothing — the WebCodecs decode queue and
// decodedQueue stay near-empty (OQ7). The bytes are demuxed/decoded on a later push once
// the scheduler has drained below low-water, in exactly the order received (Ogg demux is
// order-sensitive). configure() is deferred too — no need to spin up the decoder while
// throttled. The C# loop also stops reading above high-water, so the stash stays small.
if (this.isSchedulerFull?.()) {
this.pendingBytes.push(chunk);
return [];
}
if (!(await this.ensureConfigured())) return [];
// Drained below high-water: replay any stashed bytes first (preserving stream order), then
// the new chunk, through the demuxer as one contiguous feed.
const out: AudioBuffer[] = [];
if (this.pendingBytes.length > 0) {
const stashed = this.pendingBytes;
this.pendingBytes = [];
for (const bytes of stashed) {
this.decodePackets(this.demuxer.push(bytes));
}
}
this.decodePackets(this.demuxer.push(chunk));
// Wait until the WebCodecs decoder has processed the queued packets before draining.
await this.yieldToDecoder();
out.push(...this.drainDecoded());
return out;
}
async complete(): Promise<AudioBuffer[]> {
if (this.fatalError) {
return this.drainDecoded();
}
// End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track
// that finished sending before the scheduler drained). Configure if needed and replay the
// stash so the tail is decoded before flush — otherwise the final seconds would be lost.
if (this.pendingBytes.length > 0) {
if (await this.ensureConfigured()) {
const stashed = this.pendingBytes;
this.pendingBytes = [];
for (const bytes of stashed) {
this.decodePackets(this.demuxer.push(bytes));
}
} else {
this.pendingBytes = [];
}
}
if (!this.decoder || this.decoder.state !== 'configured') {
return this.drainDecoded();
}
try {
await this.decoder.flush();
} catch (err) {
// A flush can reject if the decoder was reset/closed concurrently (track switch); the loop's
// own cancellation handles that — surface nothing, just drain what we have.
console.warn('Opus decoder flush interrupted:', (err as Error).message);
}
return this.drainDecoded();
}
/**
* Reinitialize for a Range-continuation stream after seek-beyond-buffer.
*
* @param landingTimeSeconds The actual page-start presentation time resolved from the seek index
* (t_page ≤ targetTimeSeconds). This is the time at which the decoder
* will begin emitting audio after reconfigure.
* @param targetTimeSeconds The user-requested seek position. The difference
* `(target - landing) * OPUS_SAMPLE_RATE` frames are trimmed from the
* head of the decoded output so playback lands precisely at the target
* (AC9 fine re-sync, §3.4a step 4).
*
* Pre-skip note: the reconfigure re-applies the WebCodecs Opus decoder's own pre-skip trim. The
* W3C spec is non-normative on the exact sample count and browsers vary (~312 samples at 48 kHz in
* practice). `leadTrimFrames` is computed from the sidecar's pre-skip-corrected presentation times
* (via `presentationTimeSeconds`), so it does NOT double-count the per-reconfigure pre-skip; the
* decoder handles that internally. If browser testing reveals a residual offset, adjust the
* `leadTrimFrames` calculation here — this is the single point of control.
*/
reinitializeForRangeContinuation(landingTimeSeconds: number, targetTimeSeconds: number): void {
// New 206 body starts on a page boundary with no setup pages; the codec config is unchanged but
// inter-frame state must restart cleanly. AudioDecoder.reset() drops queued work and returns the
// decoder to 'unconfigured', so we reconfigure with the cached config. The demuxer goes into
// continuation mode (treat the first page's packets as audio — no setup pages in a 206 body).
this.demuxer.reset(true);
this.decodedQueue = [];
// Drop any bytes stashed by back-pressure: they belong to the PRE-seek stream position and
// must never be replayed against the post-seek (range-continuation) demux state (C6 — no
// stale feed racing the reset).
this.pendingBytes = [];
this.emittedFrames = 0; // post-seek buffers are positioned by the scheduler's playbackOffset
// Arm the lead trim: skip enough decoded frames to land at targetTimeSeconds, not at
// landingTimeSeconds (the page start). Clamp to ≥0 to guard against floating-point rounding.
this.leadTrimFrames = Math.max(0, Math.round((targetTimeSeconds - landingTimeSeconds) * OPUS_SAMPLE_RATE));
if (this.decoder && this.decoder.state === 'configured') {
this.decoder.reset();
this.decoder.configure(this.buildConfig());
}
}
dispose(): void {
for (const d of this.decodedQueue) {
try { d.close(); } catch { /* already closed */ }
}
this.decodedQueue = [];
this.pendingBytes = [];
if (this.decoder && this.decoder.state !== 'closed') {
try { this.decoder.close(); } catch { /* already closed */ }
}
this.decoder = null;
this.configured = false;
}
private decodePackets(packets: OpusPacket[]): void {
if (!this.decoder || this.decoder.state !== 'configured') return;
for (const pkt of packets) {
if (pkt.data.length === 0) continue;
// Every Opus packet is independently a "key" frame at the container level for WebCodecs's
// purposes — Opus has no key/delta distinction; 'key' is the correct type for all packets.
const chunk = new EncodedAudioChunk({
type: 'key',
timestamp: this.nextTimestampUs,
data: pkt.data
});
// Advance the synthetic clock by a packet's max duration; exact value is immaterial to us.
this.nextTimestampUs += Math.round((MAX_PACKET_FRAMES / OPUS_SAMPLE_RATE) * 1_000_000);
try {
this.decoder.decode(chunk);
} catch (err) {
console.error('Opus decode() threw:', (err as Error).message);
this.fatalError = true;
return;
}
}
}
/**
* Convert every queued AudioData into an AudioBuffer at the context sample rate, applying
* end-trim against the known total frame count and lead-trim for post-seek fine re-sync.
*/
private drainDecoded(): AudioBuffer[] {
const out: AudioBuffer[] = [];
const ctx = this.contextManager.getContext();
while (this.decodedQueue.length > 0) {
const data = this.decodedQueue.shift()!;
try {
const buffer = this.audioDataToBuffer(ctx, data);
if (buffer) out.push(buffer);
} finally {
try { data.close(); } catch { /* already closed */ }
}
}
return out;
}
/**
* Copy an AudioData's PCM into a new AudioBuffer, applying:
* 1. Lead-trim (post-seek fine re-sync): skip `leadTrimFrames` from the front so the audible
* start lands at the requested seek position, not at the preceding page boundary (AC9).
* 2. End-trim: cap cumulative output at `totalFrames` so the final partial frame's padding
* does not leak past the true stream end.
* Returns null when either trim leaves zero usable frames.
*/
private audioDataToBuffer(ctx: BaseAudioContext, data: AudioData): AudioBuffer | null {
const frames = data.numberOfFrames;
const channels = data.numberOfChannels;
// Lead-trim: consume frames from the front for post-seek fine re-sync (AC9).
let skip = 0;
if (this.leadTrimFrames > 0) {
skip = Math.min(this.leadTrimFrames, frames);
this.leadTrimFrames -= skip;
}
const available = frames - skip;
if (available <= 0) return null;
// End-trim: cap cumulative output at totalFrames.
let keep = available;
if (Number.isFinite(this.totalFrames)) {
const room = this.totalFrames - this.emittedFrames;
if (room <= 0) return null;
if (room < available) keep = room;
}
if (keep <= 0) return null;
const buffer = ctx.createBuffer(channels, keep, data.sampleRate);
// Allocate only for the frames we actually copy; frameOffset skips the lead-trim region.
const plane = new Float32Array(keep);
for (let ch = 0; ch < channels; ch++) {
data.copyTo(plane, { planeIndex: ch, frameOffset: skip, frameCount: keep, format: 'f32-planar' });
buffer.copyToChannel(plane, ch);
}
this.emittedFrames += keep;
return buffer;
}
/**
* Wait until the AudioDecoder's internal work queue drains (decodeQueueSize → 0), so output
* callbacks have fired before we drain decodedQueue. Bounded to MAX_YIELD_ITERS × 4 ms to guard
* against a stuck decoder; any outputs collected before the cap are still returned. `complete()`
* uses decoder.flush() as its final barrier instead (flush() is the authoritative end-of-stream
* drain).
*/
private yieldToDecoder(): Promise<void> {
const MAX_YIELD_ITERS = 50; // 50 × 4 ms = 200 ms ceiling
return new Promise<void>((resolve) => {
let iters = 0;
const poll = () => {
if (!this.decoder || this.decoder.decodeQueueSize === 0 || iters >= MAX_YIELD_ITERS) {
resolve();
return;
}
iters++;
setTimeout(poll, 4);
};
poll();
});
}
}