Files
deepdrft/DeepDrftPublic/Interop/audio/OpusStreamDecoder.ts
T

263 lines
12 KiB
TypeScript

/**
* OpusStreamDecoder - the WebCodecs streaming Opus decode pipeline.
*
* This replaces the fundamentally-broken per-segment `decodeAudioData` Opus model. Instead of cutting
* the Ogg stream into page-runs and decoding each as a standalone file (which re-applies pre-skip and
* starts from cold codec state at every boundary), it feeds a single stateful WebCodecs `AudioDecoder`
* the demuxed Opus packets in order — correct pre-skip-once handling and full inter-frame continuity.
*
* Pipeline: OggDemuxer (pages -> Opus packets + granule) -> AudioDecoder (codec 'opus', configured
* from the OpusHead in the sidecar) -> AudioData (48 kHz PCM) -> AudioBuffer -> PlaybackScheduler.
*
* Pre-skip (encoder delay): handled ONCE, by the decoder. WebCodecs decodes Opus with the OpusHead
* passed as `AudioDecoderConfig.description`; the OpusHead carries `pre_skip`, and the WebCodecs Opus
* decoder discards those leading samples itself. We do NOT re-trim per packet — doing so on top of the
* decoder's own trim would double-count. This is the spec-intended path (W3C WebCodecs Opus registration).
*
* End-trim: the sidecar's `totalDurationSeconds` is the exact pre-skip-corrected stream length. We cap
* cumulative emitted audio at that length so the final partial frame's padding does not leak past the
* true end. (Granule-position end-trim from the EOS page is the alternative; capping on the known total
* is equivalent and simpler, and the sidecar total is authoritative.)
*
* Sample rate: Opus always decodes at 48 kHz (RFC 7845). We force the AudioContext to 48 kHz at init so
* the decoded AudioData needs no resampling before scheduling — the same `recreateWithSampleRate` seam
* the WAV path uses for non-44.1 sources.
*
* BROWSER-VERIFIED. The actual decode/playback/trim correctness is verified in Daniel's browser
* (WebCodecs cannot run in Node/headless here). The Ogg demux, packet timing, and end-trim *math* are
* unit-tested; the WebCodecs glue (configure/decode/flush/AudioData->AudioBuffer) is browser-verified.
*/
import { AudioContextManager } from './AudioContextManager.js';
import { IStreamingDecoder } from './IStreamingDecoder.js';
import { OggDemuxer, OpusPacket, extractOpusHead, opusHeadChannelCount } from './OggDemuxer.js';
import { OpusSeekData, OPUS_SAMPLE_RATE } from './OpusSidecar.js';
/** Opus packet duration ceiling is 120 ms; at 48 kHz that is 5760 frames. Used only for chunk timestamps. */
const MAX_PACKET_FRAMES = 5760;
export class OpusStreamDecoder implements IStreamingDecoder {
private readonly contextManager: AudioContextManager;
private readonly sidecar: OpusSeekData;
private demuxer = new OggDemuxer();
private decoder: AudioDecoder | null = null;
private channelCount = 2;
private configured = false;
// OpusHead bytes used as the AudioDecoder `description`, captured once at first configure and reused
// verbatim on a range-continuation reconfigure (avoids re-extracting / a non-null assertion).
private opusHeadDescription: Uint8Array | null = null;
// Decoded AudioData awaiting conversion, filled by the AudioDecoder output callback.
private decodedQueue: AudioData[] = [];
private fatalError = false;
// Monotonic packet timestamp (microseconds) handed to each EncodedAudioChunk. WebCodecs requires
// strictly increasing timestamps; the true value is irrelevant to us (we schedule by accumulation),
// so a synthetic 48 kHz-derived counter suffices and stays exact.
private nextTimestampUs = 0;
// Cumulative frames already emitted as AudioBuffers, for end-trim against the known total length.
private emittedFrames = 0;
private readonly totalFrames: number;
constructor(contextManager: AudioContextManager, sidecar: OpusSeekData) {
this.contextManager = contextManager;
this.sidecar = sidecar;
this.totalFrames = sidecar.totalDurationSeconds > 0
? Math.round(sidecar.totalDurationSeconds * OPUS_SAMPLE_RATE)
: Number.POSITIVE_INFINITY;
}
get hasFatalError(): boolean {
return this.fatalError;
}
get ready(): boolean {
return this.configured;
}
get totalDuration(): number | null {
return this.sidecar.totalDurationSeconds > 0 ? this.sidecar.totalDurationSeconds : null;
}
/**
* Lazily build + configure the WebCodecs decoder from the sidecar's OpusHead. Idempotent. Forces the
* AudioContext to 48 kHz so decoded AudioData schedules without resampling. Returns false on a config
* the browser cannot support (caller should never reach here — the capability gate runs first — but
* we fail safe rather than throw into the stream loop).
*/
private async ensureConfigured(): Promise<boolean> {
if (this.configured) return true;
if (typeof AudioDecoder === 'undefined') {
this.fatalError = true;
return false;
}
const opusHead = extractOpusHead(this.sidecar.setupHeaderBytes);
if (!opusHead) {
this.fatalError = true;
return false;
}
this.channelCount = opusHeadChannelCount(opusHead);
// Copy the OpusHead into a standalone buffer — the sidecar subarray is a view we keep.
this.opusHeadDescription = opusHead.slice();
// Opus decodes at 48 kHz; align the context so no resample is needed.
if (this.contextManager.sampleRate !== OPUS_SAMPLE_RATE) {
await this.contextManager.recreateWithSampleRate(OPUS_SAMPLE_RATE);
}
this.decoder = new AudioDecoder({
output: (data) => this.decodedQueue.push(data),
error: (err) => {
console.error('Opus AudioDecoder error:', err.message);
this.fatalError = true;
}
});
this.decoder.configure(this.buildConfig());
this.configured = true;
return true;
}
private buildConfig(): AudioDecoderConfig {
return {
codec: 'opus',
sampleRate: OPUS_SAMPLE_RATE,
numberOfChannels: this.channelCount,
description: this.opusHeadDescription ?? undefined
};
}
async push(chunk: Uint8Array): Promise<AudioBuffer[]> {
if (this.fatalError) return [];
if (!(await this.ensureConfigured())) return [];
const packets = this.demuxer.push(chunk);
this.decodePackets(packets);
// Give the WebCodecs output callback a chance to run before we drain.
await this.yieldToDecoder();
return this.drainDecoded(false);
}
async complete(): Promise<AudioBuffer[]> {
if (this.fatalError || !this.decoder || this.decoder.state !== 'configured') {
return this.drainDecoded(true);
}
try {
await this.decoder.flush();
} catch (err) {
// A flush can reject if the decoder was reset/closed concurrently (track switch); the loop's
// own cancellation handles that — surface nothing, just drain what we have.
console.warn('Opus decoder flush interrupted:', (err as Error).message);
}
return this.drainDecoded(true);
}
reinitializeForRangeContinuation(): void {
// New 206 body starts on a page boundary with no setup pages; the codec config is unchanged but
// inter-frame state must restart cleanly. AudioDecoder.reset() drops queued work and returns the
// decoder to 'unconfigured', so we reconfigure with the cached config. The demuxer goes into
// continuation mode (treat the first page's packets as audio — no setup pages in a 206 body).
this.demuxer.reset(true);
this.decodedQueue = [];
this.emittedFrames = 0; // post-seek buffers are positioned by the scheduler's playbackOffset
if (this.decoder && this.decoder.state === 'configured') {
this.decoder.reset();
this.decoder.configure(this.buildConfig());
}
}
dispose(): void {
for (const d of this.decodedQueue) {
try { d.close(); } catch { /* already closed */ }
}
this.decodedQueue = [];
if (this.decoder && this.decoder.state !== 'closed') {
try { this.decoder.close(); } catch { /* already closed */ }
}
this.decoder = null;
this.configured = false;
}
private decodePackets(packets: OpusPacket[]): void {
if (!this.decoder || this.decoder.state !== 'configured') return;
for (const pkt of packets) {
if (pkt.data.length === 0) continue;
// Every Opus packet is independently a "key" frame at the container level for WebCodecs's
// purposes — Opus has no key/delta distinction; 'key' is the correct type for all packets.
const chunk = new EncodedAudioChunk({
type: 'key',
timestamp: this.nextTimestampUs,
data: pkt.data
});
// Advance the synthetic clock by a packet's max duration; exact value is immaterial to us.
this.nextTimestampUs += Math.round((MAX_PACKET_FRAMES / OPUS_SAMPLE_RATE) * 1_000_000);
try {
this.decoder.decode(chunk);
} catch (err) {
console.error('Opus decode() threw:', (err as Error).message);
this.fatalError = true;
return;
}
}
}
/**
* Convert every queued AudioData into an AudioBuffer at the context sample rate, applying end-trim
* against the known total frame count. `final` allows the very last partial buffer to be emitted.
*/
private drainDecoded(_final: boolean): AudioBuffer[] {
const out: AudioBuffer[] = [];
const ctx = this.contextManager.getContext();
while (this.decodedQueue.length > 0) {
const data = this.decodedQueue.shift()!;
try {
const buffer = this.audioDataToBuffer(ctx, data);
if (buffer) out.push(buffer);
} finally {
try { data.close(); } catch { /* already closed */ }
}
}
return out;
}
/**
* Copy an AudioData's PCM into a new AudioBuffer, trimming to not exceed the known total length
* (end-trim). Returns null if the trim leaves zero frames (the buffer is entirely past the end).
*/
private audioDataToBuffer(ctx: BaseAudioContext, data: AudioData): AudioBuffer | null {
const frames = data.numberOfFrames;
const channels = data.numberOfChannels;
// End-trim: cap cumulative output at totalFrames.
let keep = frames;
if (Number.isFinite(this.totalFrames)) {
const room = this.totalFrames - this.emittedFrames;
if (room <= 0) return null;
if (room < frames) keep = room;
}
if (keep <= 0) return null;
const buffer = ctx.createBuffer(channels, keep, data.sampleRate);
const plane = new Float32Array(frames); // copyTo fills the full frame count, then we slice
for (let ch = 0; ch < channels; ch++) {
data.copyTo(plane, { planeIndex: ch, format: 'f32-planar' });
buffer.copyToChannel(keep === frames ? plane : plane.subarray(0, keep), ch);
}
this.emittedFrames += keep;
return buffer;
}
/**
* Yield to the microtask/event loop so the synchronous decode() calls above let their async output
* callbacks fire before we drain. A zero-delay timeout (macrotask) is the reliable cross-engine way
* to let WebCodecs deliver outputs; awaiting decodeQueueSize draining is the precise alternative but
* not all engines settle it synchronously.
*/
private yieldToDecoder(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 0));
}
}