879c30a5e5
StreamDecoder peeks candidate bytes; FlacFormatDecoder scans backward for 0xFF/0xF8 sync. Fixes mid-stream decode failure where segments started mid-frame.
512 lines
21 KiB
TypeScript
512 lines
21 KiB
TypeScript
/**
|
|
* StreamDecoder - Handles audio stream parsing and AudioBuffer decoding.
|
|
*
|
|
* Single Responsibility: Convert a raw audio stream into decoded AudioBuffers.
|
|
* Format-specific work (header parsing, segment alignment, segment wrapping, seek
|
|
* byte math) is delegated to an IFormatDecoder supplied at initialize time; this
|
|
* class owns only the format-agnostic concerns: chunk accumulation, header search
|
|
* bounding, stream-complete detection, decode timeout/retry, and range continuation.
|
|
*/
|
|
|
|
import { AudioContextManager } from './AudioContextManager.js';
|
|
import { FormatInfo, IFormatDecoder } from './IFormatDecoder.js';
|
|
|
|
export interface DecodedChunkResult {
|
|
buffer: AudioBuffer;
|
|
duration: number;
|
|
}
|
|
|
|
/**
|
|
* Thrown when decodeAudioData exceeds the per-segment deadline. Distinct from
|
|
* DecodeError so callers (and operators reading logs) can tell a slow/throttled
|
|
* decoder from corrupt audio data — the previous "Decode timeout" string error
|
|
* was indistinguishable from any other Error and was silently swallowed.
|
|
*/
|
|
export class DecodeTimeoutError extends Error {
|
|
constructor(public readonly segmentOffset: number, public readonly byteCount: number) {
|
|
super(`Decode timeout at offset ${segmentOffset} (${byteCount} bytes)`);
|
|
this.name = 'DecodeTimeoutError';
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Thrown when decodeAudioData rejects for non-timeout reasons (corrupt header,
|
|
* unsupported format, etc.). Carries the segment offset so callers can log
|
|
* which part of the stream failed.
|
|
*/
|
|
export class DecodeError extends Error {
|
|
constructor(
|
|
message: string,
|
|
public readonly segmentOffset: number,
|
|
public readonly byteCount: number,
|
|
public readonly cause?: Error
|
|
) {
|
|
super(message);
|
|
this.name = 'DecodeError';
|
|
}
|
|
}
|
|
|
|
export class StreamDecoder {
|
|
// Upper bound on pre-header accumulation. 256 KB is far beyond any sane audio
|
|
// header (WAV with extended LIST/INFO/JUNK chunks, FLAC metadata blocks, etc.).
|
|
// If we have accumulated this many bytes without a valid header the stream is corrupt.
|
|
private static readonly MAX_HEADER_SEARCH_BYTES = 256 * 1024;
|
|
|
|
private contextManager: AudioContextManager;
|
|
private formatDecoder: IFormatDecoder | null = null;
|
|
private formatInfo: FormatInfo | null = null;
|
|
private rawChunks: Uint8Array[] = [];
|
|
// totalRawBytes and processedBytes are JS number (IEEE 754 double), which can
|
|
// represent integers exactly up to 2^53 bytes (~8 PB). WAV files are bounded
|
|
// at 4 GB by the 32-bit RIFF size field, so overflow is not a practical concern.
|
|
private totalRawBytes: number = 0;
|
|
private processedBytes: number = 0;
|
|
private totalStreamLength: number = 0;
|
|
private streamComplete: boolean = false;
|
|
private headerError: string | null = null;
|
|
|
|
// Range-continuation state. After a seek-beyond-buffer the server responds 206
|
|
// with raw audio from a file-absolute offset (no header). We retain the FormatInfo
|
|
// parsed from the initial stream and treat the whole body as audio data. The
|
|
// stream-complete check then counts raw bytes against the 206 Content-Length
|
|
// (remainingByteLength) rather than the full-file totalStreamLength + audioDataOffset.
|
|
private isContinuation: boolean = false;
|
|
private remainingByteLength: number = 0;
|
|
|
|
// Pre-header accumulator. Audio headers can span multiple network chunks
|
|
// (small first segment, extended WAV LIST/INFO/JUNK chunks before 'data',
|
|
// FLAC metadata blocks, etc.), so we buffer raw bytes here until the format
|
|
// decoder parses a header rather than assuming it lives in the first chunk.
|
|
private headerBytesReceived: number = 0;
|
|
private headerSearchChunks: Uint8Array[] = [];
|
|
|
|
constructor(contextManager: AudioContextManager) {
|
|
this.contextManager = contextManager;
|
|
}
|
|
|
|
/**
|
|
* Initialize for a new stream. The format decoder owns all format-specific
|
|
* parsing/wrapping/seek math for this stream's lifetime.
|
|
*/
|
|
initialize(totalStreamLength: number, formatDecoder: IFormatDecoder): void {
|
|
this.formatDecoder = formatDecoder;
|
|
this.formatInfo = null;
|
|
this.rawChunks = [];
|
|
this.totalRawBytes = 0;
|
|
this.processedBytes = 0;
|
|
this.totalStreamLength = totalStreamLength;
|
|
this.streamComplete = false;
|
|
this.headerBytesReceived = 0;
|
|
this.headerSearchChunks = [];
|
|
this.headerError = null;
|
|
this.isContinuation = false;
|
|
this.remainingByteLength = 0;
|
|
}
|
|
|
|
/**
|
|
* Process incoming chunk and return all decoded AudioBuffers ready so far.
|
|
*
|
|
* Returns an array (possibly empty) rather than a single result because the
|
|
* final chunk may unlock the residual tail in addition to a full segment,
|
|
* and a single chunk that completes header parsing may also carry enough
|
|
* audio data to decode immediately.
|
|
*/
|
|
async processChunk(chunk: Uint8Array): Promise<DecodedChunkResult[]> {
|
|
// If the header search already failed (corrupt/unrecognised stream), stop processing.
|
|
if (this.headerError) {
|
|
throw new Error(this.headerError);
|
|
}
|
|
|
|
if (!this.formatInfo) {
|
|
await this.tryParseHeader(chunk);
|
|
// Check again: tryParseHeader may have just set headerError.
|
|
if (this.headerError) {
|
|
throw new Error(this.headerError);
|
|
}
|
|
} else {
|
|
this.addRawData(chunk);
|
|
}
|
|
|
|
this.updateStreamCompleteFlag();
|
|
|
|
const results: DecodedChunkResult[] = [];
|
|
// Drain all currently-decodable segments. Without this loop, a single
|
|
// processChunk call returns at most one segment; the trailing tail
|
|
// unlocked once streamComplete flips true would never be flushed.
|
|
while (true) {
|
|
const segment = await this.tryDecodeNextSegment();
|
|
if (!segment) break;
|
|
results.push(segment);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Accumulate bytes into the header-search buffer and retry the format decoder's
|
|
* header parse. Once a header is recognised, anything past audioDataOffset
|
|
* becomes audio data.
|
|
*/
|
|
private async tryParseHeader(chunk: Uint8Array): Promise<void> {
|
|
this.headerSearchChunks.push(chunk);
|
|
this.headerBytesReceived += chunk.length;
|
|
|
|
// Guard against unbounded accumulation from a corrupt or unrecognised stream.
|
|
if (this.headerBytesReceived > StreamDecoder.MAX_HEADER_SEARCH_BYTES) {
|
|
this.headerError = `Audio header not found after ${this.headerBytesReceived} bytes — stream may be corrupt or an unsupported format`;
|
|
console.error(this.headerError);
|
|
// Drop the search buffer so subsequent chunks are not accumulated either.
|
|
this.headerSearchChunks = [];
|
|
this.headerBytesReceived = 0;
|
|
return;
|
|
}
|
|
|
|
const info = this.formatDecoder!.tryParseHeader(this.headerSearchChunks, this.headerBytesReceived);
|
|
if (!info) {
|
|
// Not enough bytes yet — wait for the next chunk. If the stream ends
|
|
// without ever producing a valid header, the final processChunk will
|
|
// mark streamComplete and the player will report no audio decoded;
|
|
// that is the correct failure mode, since there is no audio to play.
|
|
return;
|
|
}
|
|
|
|
this.formatInfo = info;
|
|
|
|
// Recreate AudioContext with correct sample rate if needed
|
|
if (this.contextManager.sampleRate !== info.sampleRate) {
|
|
await this.contextManager.recreateWithSampleRate(info.sampleRate);
|
|
}
|
|
|
|
// Concatenate all header-search chunks and push the audio-data tail
|
|
// (everything past audioDataOffset) into the raw audio buffer.
|
|
const concatenated = new Uint8Array(this.headerBytesReceived);
|
|
let offset = 0;
|
|
for (const c of this.headerSearchChunks) {
|
|
concatenated.set(c, offset);
|
|
offset += c.length;
|
|
}
|
|
const audioData = concatenated.subarray(info.audioDataOffset);
|
|
if (audioData.length > 0) {
|
|
this.addRawData(audioData);
|
|
}
|
|
|
|
// Header-search buffer no longer needed.
|
|
this.headerSearchChunks = [];
|
|
this.headerBytesReceived = 0;
|
|
}
|
|
|
|
/**
|
|
* Mark the stream complete once we've received all expected bytes. The
|
|
* computation must account for whichever stage of header parsing we're in:
|
|
* if a header has been parsed, raw audio bytes are tracked separately;
|
|
* otherwise pre-header bytes count toward the total.
|
|
*/
|
|
private updateStreamCompleteFlag(): void {
|
|
// Range-continuation: the 206 body is pure audio (no header), so compare raw
|
|
// audio bytes directly against the 206 Content-Length. Do NOT add headerSize —
|
|
// there is no header in this response.
|
|
if (this.isContinuation) {
|
|
if (this.remainingByteLength > 0 && this.totalRawBytes >= this.remainingByteLength) {
|
|
this.streamComplete = true;
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (this.totalStreamLength <= 0) return;
|
|
const totalReceived = this.formatInfo
|
|
? this.totalRawBytes + this.formatInfo.audioDataOffset
|
|
: this.headerBytesReceived;
|
|
if (totalReceived >= this.totalStreamLength) {
|
|
this.streamComplete = true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Add raw audio data to buffer
|
|
*/
|
|
private addRawData(data: Uint8Array): void {
|
|
this.rawChunks.push(data);
|
|
this.totalRawBytes += data.length;
|
|
}
|
|
|
|
/**
|
|
* Try to decode the next segment of audio.
|
|
*
|
|
* Failure modes:
|
|
* - Decode timeout: retry once, then surface as DecodeTimeoutError (typed).
|
|
* - Other decode error (corrupt data, format mismatch): surface as DecodeError.
|
|
* Both are thrown rather than silently swallowed — callers (processChunk /
|
|
* markStreamComplete) decide whether to abort the stream or skip the segment.
|
|
* processedBytes is only advanced on success so a thrown failure does not
|
|
* silently consume the failed segment.
|
|
*/
|
|
private async tryDecodeNextSegment(): Promise<DecodedChunkResult | null> {
|
|
if (!this.formatInfo) return null;
|
|
|
|
const segmentSize = 64 * 1024; // 64KB segments
|
|
const availableBytes = this.totalRawBytes - this.processedBytes;
|
|
|
|
// Peek the candidate window first so the aligner can scan for a format-specific
|
|
// frame boundary (FLAC). extractAlignedData is non-destructive — it reads from
|
|
// rawChunks without advancing processedBytes — so reading before alignment is safe.
|
|
const peekSize = Math.min(segmentSize, availableBytes);
|
|
if (peekSize === 0) return null;
|
|
const peekBytes = this.extractAlignedData(peekSize);
|
|
|
|
// Passing streamComplete lets the aligner relax the min-frame guard
|
|
// for the final tail; otherwise residual <512-byte tails get dropped.
|
|
const alignedSize = this.formatDecoder!.getAlignedSegmentSize(
|
|
this.formatInfo,
|
|
availableBytes,
|
|
segmentSize,
|
|
this.streamComplete,
|
|
peekBytes
|
|
);
|
|
|
|
if (alignedSize <= 0) return null;
|
|
|
|
const segmentOffset = this.processedBytes;
|
|
|
|
// alignedSize is always ≤ peekSize ≤ peekBytes.length, so subarray is in-bounds
|
|
// and zero-copy — no second extraction needed.
|
|
const rawSegment = peekBytes.subarray(0, alignedSize);
|
|
const decodableSegment = this.formatDecoder!.wrapSegment(this.formatInfo, rawSegment);
|
|
|
|
try {
|
|
const buffer = await this.decodeWithRetry(decodableSegment, segmentOffset, alignedSize);
|
|
// Advance only after a successful decode so a thrown timeout/decode
|
|
// failure does not silently drop the segment.
|
|
this.processedBytes += alignedSize;
|
|
return { buffer, duration: buffer.duration };
|
|
} catch (error) {
|
|
// Re-throw typed errors so the outer drain loop in processChunk /
|
|
// markStreamComplete sees the real failure instead of an empty array.
|
|
// The previous silent return hid timeouts entirely.
|
|
if (error instanceof DecodeTimeoutError || error instanceof DecodeError) {
|
|
throw error;
|
|
}
|
|
// Unknown synchronous failure during decode — wrap and surface.
|
|
throw new DecodeError(
|
|
`Decode failed at offset ${segmentOffset} (${alignedSize} bytes): ${(error as Error).message}`,
|
|
segmentOffset,
|
|
alignedSize,
|
|
error as Error);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Decode with a single retry on timeout. Web Audio's decodeAudioData is
|
|
* occasionally flaky under tab throttling; a retry costs little and recovers
|
|
* the common transient case without dropping the segment.
|
|
*/
|
|
private async decodeWithRetry(
|
|
wavData: Uint8Array,
|
|
segmentOffset: number,
|
|
alignedSize: number): Promise<AudioBuffer> {
|
|
try {
|
|
return await this.decodeWithTimeout(wavData);
|
|
} catch (error) {
|
|
if (!(error instanceof DecodeTimeoutError)) {
|
|
throw new DecodeError(
|
|
`Decode failed at offset ${segmentOffset} (${alignedSize} bytes): ${(error as Error).message}`,
|
|
segmentOffset,
|
|
alignedSize,
|
|
error as Error);
|
|
}
|
|
console.warn(
|
|
`Decode timeout at offset ${segmentOffset} (${alignedSize} bytes) — retrying once`);
|
|
try {
|
|
return await this.decodeWithTimeout(wavData);
|
|
} catch (retryError) {
|
|
if (retryError instanceof DecodeTimeoutError) {
|
|
console.error(
|
|
`Decode timeout after retry at offset ${segmentOffset} (${alignedSize} bytes)`);
|
|
throw new DecodeTimeoutError(segmentOffset, alignedSize);
|
|
}
|
|
throw new DecodeError(
|
|
`Decode failed on retry at offset ${segmentOffset} (${alignedSize} bytes): ${(retryError as Error).message}`,
|
|
segmentOffset,
|
|
alignedSize,
|
|
retryError as Error);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extract aligned data from raw chunks
|
|
*/
|
|
private extractAlignedData(size: number): Uint8Array {
|
|
const extracted = new Uint8Array(size);
|
|
let extractedOffset = 0;
|
|
let remaining = size;
|
|
let streamPosition = this.processedBytes;
|
|
let currentPos = 0;
|
|
|
|
for (const chunk of this.rawChunks) {
|
|
if (remaining <= 0) break;
|
|
|
|
if (currentPos + chunk.length <= streamPosition) {
|
|
currentPos += chunk.length;
|
|
continue;
|
|
}
|
|
|
|
const chunkStartOffset = Math.max(0, streamPosition - currentPos);
|
|
const availableInChunk = chunk.length - chunkStartOffset;
|
|
const toCopy = Math.min(availableInChunk, remaining);
|
|
|
|
if (toCopy > 0) {
|
|
extracted.set(chunk.subarray(chunkStartOffset, chunkStartOffset + toCopy), extractedOffset);
|
|
extractedOffset += toCopy;
|
|
remaining -= toCopy;
|
|
}
|
|
|
|
currentPos += chunk.length;
|
|
}
|
|
|
|
return extracted;
|
|
}
|
|
|
|
/**
|
|
* Decode with timeout to prevent hanging. Throws DecodeTimeoutError if the
|
|
* deadline expires so callers can distinguish timeout from corrupt-data
|
|
* failures (decodeAudioData throws DOMException for the latter).
|
|
*/
|
|
private async decodeWithTimeout(audioData: Uint8Array, timeoutMs: number = 5000): Promise<AudioBuffer> {
|
|
const buffer = new ArrayBuffer(audioData.length);
|
|
new Uint8Array(buffer).set(audioData);
|
|
|
|
const decodePromise = this.contextManager.decodeAudioData(buffer);
|
|
let timer: ReturnType<typeof setTimeout> | null = null;
|
|
const timeoutPromise = new Promise<never>((_, reject) => {
|
|
timer = setTimeout(() => reject(new DecodeTimeoutError(-1, audioData.length)), timeoutMs);
|
|
});
|
|
|
|
try {
|
|
return await Promise.race([decodePromise, timeoutPromise]);
|
|
} finally {
|
|
if (timer !== null) clearTimeout(timer);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get calculated duration from the parsed format header.
|
|
*
|
|
* Prefer the decoder's header-derived totalDuration (for WAV this is the exact
|
|
* dataSize/byteRate). When the header omits a usable size (e.g. a WAV data chunk
|
|
* size of 0 in a streamed/unknown-length file), fall back to deriving it from the
|
|
* total stream length minus the header — identical to the original WAV behavior.
|
|
*/
|
|
getEstimatedDuration(): number | null {
|
|
if (!this.formatInfo) return null;
|
|
if (this.formatInfo.totalDuration && this.formatInfo.totalDuration > 0) {
|
|
return this.formatInfo.totalDuration;
|
|
}
|
|
if (this.formatInfo.byteRate <= 0) return null;
|
|
const audioDataSize = this.totalStreamLength - this.formatInfo.audioDataOffset;
|
|
return audioDataSize / this.formatInfo.byteRate;
|
|
}
|
|
|
|
/**
|
|
* Check if the format header has been parsed
|
|
*/
|
|
get headerParsed(): boolean {
|
|
return this.formatInfo !== null;
|
|
}
|
|
|
|
/**
|
|
* Check if all stream data has been received
|
|
*/
|
|
get isComplete(): boolean {
|
|
return this.streamComplete;
|
|
}
|
|
|
|
/**
|
|
* Get the parsed format info (sample rate, channels, audio-data offset, …).
|
|
* Used by the player for seek byte-offset math and header-dependent decisions.
|
|
*/
|
|
getFormatInfo(): FormatInfo | null {
|
|
return this.formatInfo;
|
|
}
|
|
|
|
/**
|
|
* Calculate the file-absolute byte offset for a seek to the given time position.
|
|
* Delegates to the format decoder; the returned value is a byte position in the
|
|
* file on disk (header included), ready for a Range request.
|
|
*/
|
|
calculateByteOffset(positionSeconds: number): number {
|
|
if (!this.formatInfo) return 0;
|
|
return this.formatDecoder!.calculateByteOffset(this.formatInfo, positionSeconds);
|
|
}
|
|
|
|
/**
|
|
* Explicitly mark the stream as complete.
|
|
*
|
|
* Called by the C# streaming loop after ReadAsync returns 0 (no more data).
|
|
* This ensures streamComplete is set even when the server omits Content-Length,
|
|
* which prevents updateStreamCompleteFlag from ever firing via byte counting.
|
|
* Returns all remaining decoded segments (the tail drain pass).
|
|
*
|
|
* If streamComplete was already true (set by updateStreamCompleteFlag during the
|
|
* final processChunk call), the tail was already drained inside that call's
|
|
* while(true) loop — return immediately to avoid a second drain pass that would
|
|
* set streamingCompleted = true even if the first drain had a partial failure.
|
|
*/
|
|
async markStreamComplete(): Promise<DecodedChunkResult[]> {
|
|
if (this.streamComplete) {
|
|
return [];
|
|
}
|
|
this.streamComplete = true;
|
|
const results: DecodedChunkResult[] = [];
|
|
while (true) {
|
|
const segment = await this.tryDecodeNextSegment();
|
|
if (!segment) break;
|
|
results.push(segment);
|
|
}
|
|
return results;
|
|
}
|
|
|
|
/**
|
|
* Reset decoder state. The format decoder is retained — a stream's format does
|
|
* not change across reset; a new stream supplies a fresh decoder via initialize.
|
|
*/
|
|
reset(): void {
|
|
this.formatInfo = null;
|
|
this.rawChunks = [];
|
|
this.totalRawBytes = 0;
|
|
this.processedBytes = 0;
|
|
this.totalStreamLength = 0;
|
|
this.streamComplete = false;
|
|
this.headerBytesReceived = 0;
|
|
this.headerSearchChunks = [];
|
|
this.headerError = null;
|
|
this.isContinuation = false;
|
|
this.remainingByteLength = 0;
|
|
}
|
|
|
|
/**
|
|
* Reinitialize for a Range-continuation stream after seek-beyond-buffer.
|
|
*
|
|
* The server responds to a Range request with 206 Partial Content carrying raw
|
|
* audio from a file-absolute offset — there is NO header in this body. We retain
|
|
* the FormatInfo parsed from the initial stream (its format describes every segment
|
|
* the decoder wraps via wrapSegment) and feed the entire 206 body straight into the
|
|
* decode pipeline. The `if (!this.formatInfo)` branch in processChunk therefore goes
|
|
* directly to addRawData and tryParseHeader is never re-entered.
|
|
*
|
|
* @param remainingByteLength the Content-Length of the 206 response — the number of
|
|
* bytes from the range start to EOF, NOT the full file size. Stream-complete is
|
|
* reached when totalRawBytes >= this value.
|
|
*/
|
|
reinitializeForRangeContinuation(remainingByteLength: number): void {
|
|
// Retain this.formatInfo and this.formatDecoder — the 206 body carries no header to reparse.
|
|
this.rawChunks = [];
|
|
this.totalRawBytes = 0;
|
|
this.processedBytes = 0;
|
|
this.streamComplete = false;
|
|
this.headerBytesReceived = 0;
|
|
this.headerSearchChunks = [];
|
|
this.headerError = null;
|
|
this.isContinuation = true;
|
|
this.remainingByteLength = remainingByteLength;
|
|
}
|
|
}
|