Fix Critical: streaming race, dirty buffer, dropped tail, fragmented header

- SeekBeyondBuffer and LoadTrackStreaming assign _activeStreamingTask before
  awaiting; DrainActiveStreamingTaskAsync awaits previous task before new
  stream starts, closing the concurrent-seek race on the JS StreamDecoder
- Always slice ArrayPool buffer to currentBytes before sending to JS interop;
  eliminates stale bytes from prior rentals reaching the audio decoder
- getSampleAlignedChunkSize accepts streamComplete flag; bypasses minimum
  chunk guard on final tail so trailing bytes are decoded, not dropped
- StreamDecoder accumulates headerSearchChunks until parseHeader succeeds,
  with 256 KB MAX_HEADER_SEARCH_BYTES bound; handles fragmented first chunks
  and extended WAV headers with LIST/INFO/JUNK chunks
- markStreamComplete early-returns when streamComplete already set to prevent
  double-drain and incorrect streamingCompleted flag after partial failure
- processedBytes advances only after successful decode; failed segments leave
  cursor in place rather than permanently skipping audio
- AudioInteropService.MarkStreamCompleteAsync wires C# loop exit to JS decoder
  ensuring tail drain fires even when Content-Length header is absent
This commit is contained in:
Daniel Harvey
2026-05-17 11:28:53 -04:00
parent 56d15027e4
commit dd96caa709
6 changed files with 277 additions and 42 deletions
@@ -56,6 +56,11 @@ public class AudioInteropService : IAsyncDisposable
return await InvokeJsAsync<AudioOperationResult>("DeepDrftAudio.startStreamingPlayback", playerId);
}
public async Task<AudioOperationResult> MarkStreamCompleteAsync(string playerId)
{
return await InvokeJsAsync<AudioOperationResult>("DeepDrftAudio.markStreamComplete", playerId);
}
public async Task<AudioOperationResult> EnsureAudioContextReady(string playerId)
{
return await InvokeJsAsync<AudioOperationResult>("DeepDrftAudio.ensureAudioContextReady", playerId);
@@ -27,6 +27,7 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
private bool _streamingPlaybackStarted = false;
private CancellationTokenSource? _streamingCancellation;
private Task? _activeStreamingTask;
private DateTime _lastNotification = DateTime.MinValue;
private readonly ILogger<StreamingAudioPlayerService> _logger;
private string? _currentTrackId;
@@ -62,7 +63,11 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
private async Task LoadTrackStreaming(TrackEntity track)
{
// Always reset to clean state before loading new track
// Always reset to clean state before loading new track. ResetToIdle
// both cancels and awaits any in-flight streaming loop, so by the time
// we return from it the previous loop is guaranteed to have exited and
// there is no risk of interleaved ProcessStreamingChunk calls against
// the single-instance JS StreamDecoder.
await ResetToIdle();
// Save track ID for seek operations
@@ -116,7 +121,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
return;
}
await StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token);
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token);
await _activeStreamingTask;
}
catch (OperationCanceledException)
{
@@ -163,9 +169,12 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
{
totalBytesRead += currentBytes;
// Use only the actual bytes read, no copying needed
var actualBuffer = currentBytes == _currentBufferSize ? buffer : buffer[..currentBytes];
// Always slice to the exact number of bytes read. The pooled buffer
// is rented at MaxBufferSize and may carry stale bytes past
// currentBytes from a prior rental — handing the full array to JS
// interop would serialise that garbage into the audio stream.
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
// Process chunk for streaming
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
if (!chunkResult.Success)
@@ -217,7 +226,13 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
await ThrottledNotifyStateChanged();
}
} while (currentBytes > 0);
// Notify the JS decoder that the stream is finished. When the server omits
// Content-Length the StreamDecoder cannot determine completion via byte counting
// alone; this explicit signal ensures the tail-decoding path (streamComplete=true)
// fires regardless of whether Content-Length was present.
await _audioInterop.MarkStreamCompleteAsync(PlayerId);
// Mark as fully loaded
LoadProgress = 1.0;
await NotifyStateChanged();
@@ -314,8 +329,13 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
IsSeekingBeyondBuffer = true;
// Cancel current streaming
// Cancel the current streaming loop AND wait for it to fully exit before
// starting a new one. The previous loop's pending ReadAsync will throw
// OperationCanceledException asynchronously; if we kick off a new loop
// immediately, both can race against the single-instance JS StreamDecoder
// and corrupt decode state. Draining here is the load-bearing guarantee.
_streamingCancellation?.Cancel();
await DrainActiveStreamingTaskAsync();
_streamingCancellation?.Dispose();
_streamingCancellation = new CancellationTokenSource();
@@ -355,7 +375,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
BufferedChunks = 0;
// Stream audio from offset
await StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token);
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token);
await _activeStreamingTask;
IsSeekingBeyondBuffer = false;
}
@@ -379,8 +400,11 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
/// </summary>
private async Task ResetToIdle()
{
// 1. Cancel any ongoing streaming operation
// 1. Cancel any ongoing streaming operation and wait for it to exit
// before tearing down JS state. Otherwise the loop's pending
// ProcessStreamingChunk call can land after StopAsync/UnloadAsync.
_streamingCancellation?.Cancel();
await DrainActiveStreamingTaskAsync();
_streamingCancellation?.Dispose();
_streamingCancellation = null;
@@ -417,6 +441,40 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
await NotifyStateChanged();
}
/// <summary>
/// Wait for the previously-started streaming loop to fully exit. The caller
/// must have already cancelled <see cref="_streamingCancellation"/>. Swallows
/// the expected OperationCanceledException; any other exception was already
/// surfaced through the loop's own catch block, so we ignore it here too.
/// </summary>
private async Task DrainActiveStreamingTaskAsync()
{
var task = _activeStreamingTask;
if (task == null) return;
try
{
await task;
}
catch (OperationCanceledException)
{
// Expected when we cancelled the loop ourselves.
}
catch
{
// Any other failure was already logged inside the loop.
}
finally
{
// Only clear if we are still the active task — a concurrent caller
// may have started a new stream while we were draining the old one.
if (ReferenceEquals(_activeStreamingTask, task))
{
_activeStreamingTask = null;
}
}
}
private async Task ThrottledNotifyStateChanged()
{
var now = DateTime.UtcNow;
+31 -3
View File
@@ -107,12 +107,40 @@ export class AudioPlayer {
}
}
/**
* Signal to the decoder that the C# streaming loop has finished sending bytes.
* This sets streamComplete=true and flushes any remaining decoded tail segments.
* Must be called after the ReadAsync loop exits, regardless of whether
* Content-Length was known — without it the tail-decode path is dead when
* Content-Length is absent.
*/
async markStreamComplete(): Promise<StreamingResult> {
try {
const results = await this.streamDecoder.markStreamComplete();
if (results.length > 0) {
for (const result of results) {
this.scheduler.addBuffer(result.buffer);
}
if (this.streamingStarted && this.isPlaying) {
this.scheduler.scheduleNewBuffers();
}
}
this.streamingCompleted = true;
console.log('Stream marked complete by C# signal');
return { success: true, bufferCount: this.scheduler.getBufferCount() };
} catch (error) {
return { success: false, error: (error as Error).message };
}
}
async processStreamingChunk(chunk: Uint8Array): Promise<StreamingResult> {
try {
const result = await this.streamDecoder.processChunk(chunk);
const results = await this.streamDecoder.processChunk(chunk);
if (result) {
this.scheduler.addBuffer(result.buffer);
if (results.length > 0) {
for (const result of results) {
this.scheduler.addBuffer(result.buffer);
}
// Update duration estimate
const estimatedDuration = this.streamDecoder.getEstimatedDuration();
+152 -23
View File
@@ -13,13 +13,26 @@ export interface DecodedChunkResult {
}
export class StreamDecoder {
// Upper bound on pre-header accumulation. 256 KB is far beyond any sane WAV
// header (including extended LIST/INFO/JUNK chunks). If we have accumulated
// this many bytes without finding a valid header the stream is corrupt.
private static readonly MAX_HEADER_SEARCH_BYTES = 256 * 1024;
private contextManager: AudioContextManager;
private wavHeader: WavHeader | null = null;
private rawChunks: Uint8Array[] = [];
private totalRawBytes: number = 0;
private processedBytes: number = 0;
private isFirstChunk: boolean = true;
private totalStreamLength: number = 0;
private streamComplete: boolean = false;
private headerError: string | null = null;
// Pre-header accumulator. WAV headers can span multiple network chunks
// (small first segment, extended LIST/INFO/JUNK chunks before 'data', etc.),
// so we buffer raw bytes here until parseHeader succeeds rather than assuming
// the whole header lives in the first chunk.
private headerBytesReceived: number = 0;
private headerSearchChunks: Uint8Array[] = [];
constructor(contextManager: AudioContextManager) {
this.contextManager = contextManager;
@@ -33,34 +46,78 @@ export class StreamDecoder {
this.rawChunks = [];
this.totalRawBytes = 0;
this.processedBytes = 0;
this.isFirstChunk = true;
this.totalStreamLength = totalStreamLength;
this.streamComplete = false;
this.headerBytesReceived = 0;
this.headerSearchChunks = [];
this.headerError = null;
console.log(`StreamDecoder initialized: expecting ${totalStreamLength} bytes`);
}
/**
* Process incoming chunk and return decoded AudioBuffer if ready
* Process incoming chunk and return all decoded AudioBuffers ready so far.
*
* Returns an array (possibly empty) rather than a single result because the
* final chunk may unlock the residual tail in addition to a full segment,
* and a single chunk that completes header parsing may also carry enough
* audio data to decode immediately.
*/
async processChunk(chunk: Uint8Array): Promise<DecodedChunkResult | null> {
if (this.isFirstChunk) {
await this.handleFirstChunk(chunk);
this.isFirstChunk = false;
async processChunk(chunk: Uint8Array): Promise<DecodedChunkResult[]> {
// If the header search already failed (corrupt/non-WAV stream), stop processing.
if (this.headerError) {
throw new Error(this.headerError);
}
if (!this.wavHeader) {
await this.tryParseHeader(chunk);
// Check again: tryParseHeader may have just set headerError.
if (this.headerError) {
throw new Error(this.headerError);
}
} else {
this.addRawData(chunk);
}
return this.tryDecodeNextSegment();
this.updateStreamCompleteFlag();
const results: DecodedChunkResult[] = [];
// Drain all currently-decodable segments. Without this loop, a single
// processChunk call returns at most one segment; the trailing tail
// unlocked once streamComplete flips true would never be flushed.
while (true) {
const segment = await this.tryDecodeNextSegment();
if (!segment) break;
results.push(segment);
}
return results;
}
/**
* Handle first chunk - extract WAV header and setup AudioContext
* Accumulate bytes into the header-search buffer and retry parseHeader.
* Once a header is recognised, anything past headerSize becomes audio data.
*/
private async handleFirstChunk(chunk: Uint8Array): Promise<void> {
console.log('\n--- Processing first chunk ---');
private async tryParseHeader(chunk: Uint8Array): Promise<void> {
this.headerSearchChunks.push(chunk);
this.headerBytesReceived += chunk.length;
const header = WavUtils.parseHeader([chunk], chunk.length);
// Guard against unbounded accumulation from a corrupt or non-WAV stream.
if (this.headerBytesReceived > StreamDecoder.MAX_HEADER_SEARCH_BYTES) {
this.headerError = `WAV header not found after ${this.headerBytesReceived} bytes — stream may be corrupt or not a WAV file`;
console.error(this.headerError);
// Drop the search buffer so subsequent chunks are not accumulated either.
this.headerSearchChunks = [];
this.headerBytesReceived = 0;
return;
}
const header = WavUtils.parseHeader(this.headerSearchChunks, this.headerBytesReceived);
if (!header) {
throw new Error('Invalid WAV header in first chunk');
// Not enough bytes yet — wait for the next chunk. If the stream ends
// without ever producing a valid header, the final processChunk will
// mark streamComplete and the player will report no audio decoded;
// that is the correct failure mode, since there is no audio to play.
console.log(`Header not yet parsable: ${this.headerBytesReceived} bytes accumulated`);
return;
}
this.wavHeader = header;
@@ -72,10 +129,39 @@ export class StreamDecoder {
await this.contextManager.recreateWithSampleRate(header.sampleRate);
}
// Extract audio data (skip WAV header)
const audioData = chunk.subarray(header.headerSize);
this.addRawData(audioData);
console.log(`Extracted ${audioData.length} bytes of audio data`);
// Concatenate all header-search chunks and push the audio-data tail
// (everything past headerSize) into the raw audio buffer.
const concatenated = new Uint8Array(this.headerBytesReceived);
let offset = 0;
for (const c of this.headerSearchChunks) {
concatenated.set(c, offset);
offset += c.length;
}
const audioData = concatenated.subarray(header.headerSize);
if (audioData.length > 0) {
this.addRawData(audioData);
}
console.log(`Extracted ${audioData.length} bytes of audio data from header buffer`);
// Header-search buffer no longer needed.
this.headerSearchChunks = [];
this.headerBytesReceived = 0;
}
/**
* Mark the stream complete once we've received all expected bytes. The
* computation must account for whichever stage of header parsing we're in:
* if a header has been parsed, raw audio bytes are tracked separately;
* otherwise pre-header bytes count toward the total.
*/
private updateStreamCompleteFlag(): void {
if (this.totalStreamLength <= 0) return;
const totalReceived = this.wavHeader
? this.totalRawBytes + this.wavHeader.headerSize
: this.headerBytesReceived;
if (totalReceived >= this.totalStreamLength) {
this.streamComplete = true;
}
}
/**
@@ -94,10 +180,18 @@ export class StreamDecoder {
const segmentSize = 64 * 1024; // 64KB segments
const availableBytes = this.totalRawBytes - this.processedBytes;
const alignedSize = WavUtils.getSampleAlignedChunkSize(this.wavHeader, segmentSize, availableBytes);
// Passing streamComplete lets the aligner relax the min-frame guard
// for the final tail; otherwise residual <512-byte tails get dropped.
const alignedSize = WavUtils.getSampleAlignedChunkSize(
this.wavHeader,
segmentSize,
availableBytes,
this.streamComplete
);
if (alignedSize <= 0) return null;
const segmentOffset = this.processedBytes;
console.log(`\n--- Decoding segment ---`);
console.log(`Available: ${availableBytes} bytes, aligned size: ${alignedSize} bytes`);
@@ -106,10 +200,13 @@ export class StreamDecoder {
try {
const buffer = await this.decodeWithTimeout(wavFile);
// Advance only after a successful decode so that a timeout or decode
// failure does not permanently skip the segment.
this.processedBytes += alignedSize;
console.log(`✓ Decoded: ${buffer.duration.toFixed(3)}s, ${buffer.numberOfChannels}ch`);
return { buffer, duration: buffer.duration };
} catch (error) {
console.error('Failed to decode segment:', error);
console.error(`Failed to decode segment at offset ${segmentOffset}:`, error);
return null;
}
}
@@ -145,7 +242,6 @@ export class StreamDecoder {
currentPos += chunk.length;
}
this.processedBytes += size;
return extracted;
}
@@ -199,7 +295,7 @@ export class StreamDecoder {
* Check if all stream data has been received
*/
get isComplete(): boolean {
return this.totalStreamLength > 0 && this.totalRawBytes >= (this.totalStreamLength - (this.wavHeader?.headerSize ?? 0));
return this.streamComplete;
}
/**
@@ -221,6 +317,33 @@ export class StreamDecoder {
return Math.floor(rawOffset / this.wavHeader.blockAlign) * this.wavHeader.blockAlign;
}
/**
* Explicitly mark the stream as complete.
*
* Called by the C# streaming loop after ReadAsync returns 0 (no more data).
* This ensures streamComplete is set even when the server omits Content-Length,
* which prevents updateStreamCompleteFlag from ever firing via byte counting.
* Returns all remaining decoded segments (the tail drain pass).
*
* If streamComplete was already true (set by updateStreamCompleteFlag during the
* final processChunk call), the tail was already drained inside that call's
* while(true) loop — return immediately to avoid a second drain pass that would
* set streamingCompleted = true even if the first drain had a partial failure.
*/
async markStreamComplete(): Promise<DecodedChunkResult[]> {
if (this.streamComplete) {
return [];
}
this.streamComplete = true;
const results: DecodedChunkResult[] = [];
while (true) {
const segment = await this.tryDecodeNextSegment();
if (!segment) break;
results.push(segment);
}
return results;
}
/**
* Reset decoder state
*/
@@ -229,8 +352,11 @@ export class StreamDecoder {
this.rawChunks = [];
this.totalRawBytes = 0;
this.processedBytes = 0;
this.isFirstChunk = true;
this.totalStreamLength = 0;
this.streamComplete = false;
this.headerBytesReceived = 0;
this.headerSearchChunks = [];
this.headerError = null;
}
/**
@@ -242,8 +368,11 @@ export class StreamDecoder {
this.rawChunks = [];
this.totalRawBytes = 0;
this.processedBytes = 0;
this.isFirstChunk = true;
this.totalStreamLength = totalStreamLength;
this.streamComplete = false;
this.headerBytesReceived = 0;
this.headerSearchChunks = [];
this.headerError = null;
// wavHeader will be reparsed from the new stream (server sends fresh header)
this.wavHeader = null;
console.log(`StreamDecoder reinitialized for offset: expecting ${totalStreamLength} bytes`);
+6
View File
@@ -45,6 +45,12 @@ const DeepDrftAudio = {
return player.startStreamingPlayback();
},
markStreamComplete: async (playerId: string): Promise<StreamingResult> => {
const player = audioPlayers.get(playerId);
if (!player) return { success: false, error: 'Player not found' };
return player.markStreamComplete();
},
ensureAudioContextReady: async (playerId: string): Promise<AudioResult> => {
const player = audioPlayers.get(playerId);
if (!player) return { success: false, error: 'Player not found' };
+16 -7
View File
@@ -173,17 +173,26 @@ class WavUtils {
buffer[43] = (audioDataSize >> 24) & 0xFF;
}
static getSampleAlignedChunkSize(header: WavHeader, maxChunkSize: number, availableDataSize: number): number {
static getSampleAlignedChunkSize(header: WavHeader, maxChunkSize: number, availableDataSize: number, streamComplete: boolean = false): number {
const frameSize = header.blockAlign;
// Much smaller minimum for streaming - just enough for Web Audio API
// Much smaller minimum for streaming - just enough for Web Audio API.
// The minimum exists to avoid decoding partial-frame artifacts on
// mid-stream chunks while the rest is still in flight. Once the stream
// is fully received, we must drain whatever remains regardless of size,
// otherwise the trailing tail (often <512 bytes) is silently lost.
const minAudioBytes = Math.max(512, frameSize * 10); // At least 512 bytes or 10 frames
// If we don't have enough data, return 0 to wait for more
if (availableDataSize < minAudioBytes) {
// Mid-stream guard: wait for more data if below minimum.
if (!streamComplete && availableDataSize < minAudioBytes) {
return 0;
}
// Even when complete we still need at least one full frame to decode.
if (availableDataSize < frameSize) {
return 0;
}
// Calculate frames for the available data
const requestedSize = Math.min(maxChunkSize, availableDataSize);
const frames = Math.floor(requestedSize / frameSize);