diff --git a/DeepDrftWeb.Client/Services/AudioInteropService.cs b/DeepDrftWeb.Client/Services/AudioInteropService.cs index 148193d..8b85204 100644 --- a/DeepDrftWeb.Client/Services/AudioInteropService.cs +++ b/DeepDrftWeb.Client/Services/AudioInteropService.cs @@ -56,6 +56,11 @@ public class AudioInteropService : IAsyncDisposable return await InvokeJsAsync("DeepDrftAudio.startStreamingPlayback", playerId); } + public async Task MarkStreamCompleteAsync(string playerId) + { + return await InvokeJsAsync("DeepDrftAudio.markStreamComplete", playerId); + } + public async Task EnsureAudioContextReady(string playerId) { return await InvokeJsAsync("DeepDrftAudio.ensureAudioContextReady", playerId); diff --git a/DeepDrftWeb.Client/Services/StreamingAudioPlayerService.cs b/DeepDrftWeb.Client/Services/StreamingAudioPlayerService.cs index 8ee643f..707854e 100644 --- a/DeepDrftWeb.Client/Services/StreamingAudioPlayerService.cs +++ b/DeepDrftWeb.Client/Services/StreamingAudioPlayerService.cs @@ -27,6 +27,7 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS private bool _streamingPlaybackStarted = false; private CancellationTokenSource? _streamingCancellation; + private Task? _activeStreamingTask; private DateTime _lastNotification = DateTime.MinValue; private readonly ILogger _logger; private string? _currentTrackId; @@ -62,7 +63,11 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS private async Task LoadTrackStreaming(TrackEntity track) { - // Always reset to clean state before loading new track + // Always reset to clean state before loading new track. ResetToIdle + // both cancels and awaits any in-flight streaming loop, so by the time + // we return from it the previous loop is guaranteed to have exited and + // there is no risk of interleaved ProcessStreamingChunk calls against + // the single-instance JS StreamDecoder. await ResetToIdle(); // Save track ID for seek operations @@ -116,7 +121,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS return; } - await StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token); + _activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token); + await _activeStreamingTask; } catch (OperationCanceledException) { @@ -163,9 +169,12 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS { totalBytesRead += currentBytes; - // Use only the actual bytes read, no copying needed - var actualBuffer = currentBytes == _currentBufferSize ? buffer : buffer[..currentBytes]; - + // Always slice to the exact number of bytes read. The pooled buffer + // is rented at MaxBufferSize and may carry stale bytes past + // currentBytes from a prior rental — handing the full array to JS + // interop would serialise that garbage into the audio stream. + var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray(); + // Process chunk for streaming var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer); if (!chunkResult.Success) @@ -217,7 +226,13 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS await ThrottledNotifyStateChanged(); } } while (currentBytes > 0); - + + // Notify the JS decoder that the stream is finished. When the server omits + // Content-Length the StreamDecoder cannot determine completion via byte counting + // alone; this explicit signal ensures the tail-decoding path (streamComplete=true) + // fires regardless of whether Content-Length was present. + await _audioInterop.MarkStreamCompleteAsync(PlayerId); + // Mark as fully loaded LoadProgress = 1.0; await NotifyStateChanged(); @@ -314,8 +329,13 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS IsSeekingBeyondBuffer = true; - // Cancel current streaming + // Cancel the current streaming loop AND wait for it to fully exit before + // starting a new one. The previous loop's pending ReadAsync will throw + // OperationCanceledException asynchronously; if we kick off a new loop + // immediately, both can race against the single-instance JS StreamDecoder + // and corrupt decode state. Draining here is the load-bearing guarantee. _streamingCancellation?.Cancel(); + await DrainActiveStreamingTaskAsync(); _streamingCancellation?.Dispose(); _streamingCancellation = new CancellationTokenSource(); @@ -355,7 +375,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS BufferedChunks = 0; // Stream audio from offset - await StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token); + _activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token); + await _activeStreamingTask; IsSeekingBeyondBuffer = false; } @@ -379,8 +400,11 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS /// private async Task ResetToIdle() { - // 1. Cancel any ongoing streaming operation + // 1. Cancel any ongoing streaming operation and wait for it to exit + // before tearing down JS state. Otherwise the loop's pending + // ProcessStreamingChunk call can land after StopAsync/UnloadAsync. _streamingCancellation?.Cancel(); + await DrainActiveStreamingTaskAsync(); _streamingCancellation?.Dispose(); _streamingCancellation = null; @@ -417,6 +441,40 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS await NotifyStateChanged(); } + /// + /// Wait for the previously-started streaming loop to fully exit. The caller + /// must have already cancelled . Swallows + /// the expected OperationCanceledException; any other exception was already + /// surfaced through the loop's own catch block, so we ignore it here too. + /// + private async Task DrainActiveStreamingTaskAsync() + { + var task = _activeStreamingTask; + if (task == null) return; + + try + { + await task; + } + catch (OperationCanceledException) + { + // Expected when we cancelled the loop ourselves. + } + catch + { + // Any other failure was already logged inside the loop. + } + finally + { + // Only clear if we are still the active task — a concurrent caller + // may have started a new stream while we were draining the old one. + if (ReferenceEquals(_activeStreamingTask, task)) + { + _activeStreamingTask = null; + } + } + } + private async Task ThrottledNotifyStateChanged() { var now = DateTime.UtcNow; diff --git a/DeepDrftWeb/Interop/audio/AudioPlayer.ts b/DeepDrftWeb/Interop/audio/AudioPlayer.ts index b4e937a..21eae34 100644 --- a/DeepDrftWeb/Interop/audio/AudioPlayer.ts +++ b/DeepDrftWeb/Interop/audio/AudioPlayer.ts @@ -107,12 +107,40 @@ export class AudioPlayer { } } + /** + * Signal to the decoder that the C# streaming loop has finished sending bytes. + * This sets streamComplete=true and flushes any remaining decoded tail segments. + * Must be called after the ReadAsync loop exits, regardless of whether + * Content-Length was known — without it the tail-decode path is dead when + * Content-Length is absent. + */ + async markStreamComplete(): Promise { + try { + const results = await this.streamDecoder.markStreamComplete(); + if (results.length > 0) { + for (const result of results) { + this.scheduler.addBuffer(result.buffer); + } + if (this.streamingStarted && this.isPlaying) { + this.scheduler.scheduleNewBuffers(); + } + } + this.streamingCompleted = true; + console.log('Stream marked complete by C# signal'); + return { success: true, bufferCount: this.scheduler.getBufferCount() }; + } catch (error) { + return { success: false, error: (error as Error).message }; + } + } + async processStreamingChunk(chunk: Uint8Array): Promise { try { - const result = await this.streamDecoder.processChunk(chunk); + const results = await this.streamDecoder.processChunk(chunk); - if (result) { - this.scheduler.addBuffer(result.buffer); + if (results.length > 0) { + for (const result of results) { + this.scheduler.addBuffer(result.buffer); + } // Update duration estimate const estimatedDuration = this.streamDecoder.getEstimatedDuration(); diff --git a/DeepDrftWeb/Interop/audio/StreamDecoder.ts b/DeepDrftWeb/Interop/audio/StreamDecoder.ts index 3f4b68a..6a1a3a4 100644 --- a/DeepDrftWeb/Interop/audio/StreamDecoder.ts +++ b/DeepDrftWeb/Interop/audio/StreamDecoder.ts @@ -13,13 +13,26 @@ export interface DecodedChunkResult { } export class StreamDecoder { + // Upper bound on pre-header accumulation. 256 KB is far beyond any sane WAV + // header (including extended LIST/INFO/JUNK chunks). If we have accumulated + // this many bytes without finding a valid header the stream is corrupt. + private static readonly MAX_HEADER_SEARCH_BYTES = 256 * 1024; + private contextManager: AudioContextManager; private wavHeader: WavHeader | null = null; private rawChunks: Uint8Array[] = []; private totalRawBytes: number = 0; private processedBytes: number = 0; - private isFirstChunk: boolean = true; private totalStreamLength: number = 0; + private streamComplete: boolean = false; + private headerError: string | null = null; + + // Pre-header accumulator. WAV headers can span multiple network chunks + // (small first segment, extended LIST/INFO/JUNK chunks before 'data', etc.), + // so we buffer raw bytes here until parseHeader succeeds rather than assuming + // the whole header lives in the first chunk. + private headerBytesReceived: number = 0; + private headerSearchChunks: Uint8Array[] = []; constructor(contextManager: AudioContextManager) { this.contextManager = contextManager; @@ -33,34 +46,78 @@ export class StreamDecoder { this.rawChunks = []; this.totalRawBytes = 0; this.processedBytes = 0; - this.isFirstChunk = true; this.totalStreamLength = totalStreamLength; + this.streamComplete = false; + this.headerBytesReceived = 0; + this.headerSearchChunks = []; + this.headerError = null; console.log(`StreamDecoder initialized: expecting ${totalStreamLength} bytes`); } /** - * Process incoming chunk and return decoded AudioBuffer if ready + * Process incoming chunk and return all decoded AudioBuffers ready so far. + * + * Returns an array (possibly empty) rather than a single result because the + * final chunk may unlock the residual tail in addition to a full segment, + * and a single chunk that completes header parsing may also carry enough + * audio data to decode immediately. */ - async processChunk(chunk: Uint8Array): Promise { - if (this.isFirstChunk) { - await this.handleFirstChunk(chunk); - this.isFirstChunk = false; + async processChunk(chunk: Uint8Array): Promise { + // If the header search already failed (corrupt/non-WAV stream), stop processing. + if (this.headerError) { + throw new Error(this.headerError); + } + + if (!this.wavHeader) { + await this.tryParseHeader(chunk); + // Check again: tryParseHeader may have just set headerError. + if (this.headerError) { + throw new Error(this.headerError); + } } else { this.addRawData(chunk); } - return this.tryDecodeNextSegment(); + this.updateStreamCompleteFlag(); + + const results: DecodedChunkResult[] = []; + // Drain all currently-decodable segments. Without this loop, a single + // processChunk call returns at most one segment; the trailing tail + // unlocked once streamComplete flips true would never be flushed. + while (true) { + const segment = await this.tryDecodeNextSegment(); + if (!segment) break; + results.push(segment); + } + return results; } /** - * Handle first chunk - extract WAV header and setup AudioContext + * Accumulate bytes into the header-search buffer and retry parseHeader. + * Once a header is recognised, anything past headerSize becomes audio data. */ - private async handleFirstChunk(chunk: Uint8Array): Promise { - console.log('\n--- Processing first chunk ---'); + private async tryParseHeader(chunk: Uint8Array): Promise { + this.headerSearchChunks.push(chunk); + this.headerBytesReceived += chunk.length; - const header = WavUtils.parseHeader([chunk], chunk.length); + // Guard against unbounded accumulation from a corrupt or non-WAV stream. + if (this.headerBytesReceived > StreamDecoder.MAX_HEADER_SEARCH_BYTES) { + this.headerError = `WAV header not found after ${this.headerBytesReceived} bytes — stream may be corrupt or not a WAV file`; + console.error(this.headerError); + // Drop the search buffer so subsequent chunks are not accumulated either. + this.headerSearchChunks = []; + this.headerBytesReceived = 0; + return; + } + + const header = WavUtils.parseHeader(this.headerSearchChunks, this.headerBytesReceived); if (!header) { - throw new Error('Invalid WAV header in first chunk'); + // Not enough bytes yet — wait for the next chunk. If the stream ends + // without ever producing a valid header, the final processChunk will + // mark streamComplete and the player will report no audio decoded; + // that is the correct failure mode, since there is no audio to play. + console.log(`Header not yet parsable: ${this.headerBytesReceived} bytes accumulated`); + return; } this.wavHeader = header; @@ -72,10 +129,39 @@ export class StreamDecoder { await this.contextManager.recreateWithSampleRate(header.sampleRate); } - // Extract audio data (skip WAV header) - const audioData = chunk.subarray(header.headerSize); - this.addRawData(audioData); - console.log(`Extracted ${audioData.length} bytes of audio data`); + // Concatenate all header-search chunks and push the audio-data tail + // (everything past headerSize) into the raw audio buffer. + const concatenated = new Uint8Array(this.headerBytesReceived); + let offset = 0; + for (const c of this.headerSearchChunks) { + concatenated.set(c, offset); + offset += c.length; + } + const audioData = concatenated.subarray(header.headerSize); + if (audioData.length > 0) { + this.addRawData(audioData); + } + console.log(`Extracted ${audioData.length} bytes of audio data from header buffer`); + + // Header-search buffer no longer needed. + this.headerSearchChunks = []; + this.headerBytesReceived = 0; + } + + /** + * Mark the stream complete once we've received all expected bytes. The + * computation must account for whichever stage of header parsing we're in: + * if a header has been parsed, raw audio bytes are tracked separately; + * otherwise pre-header bytes count toward the total. + */ + private updateStreamCompleteFlag(): void { + if (this.totalStreamLength <= 0) return; + const totalReceived = this.wavHeader + ? this.totalRawBytes + this.wavHeader.headerSize + : this.headerBytesReceived; + if (totalReceived >= this.totalStreamLength) { + this.streamComplete = true; + } } /** @@ -94,10 +180,18 @@ export class StreamDecoder { const segmentSize = 64 * 1024; // 64KB segments const availableBytes = this.totalRawBytes - this.processedBytes; - const alignedSize = WavUtils.getSampleAlignedChunkSize(this.wavHeader, segmentSize, availableBytes); + // Passing streamComplete lets the aligner relax the min-frame guard + // for the final tail; otherwise residual <512-byte tails get dropped. + const alignedSize = WavUtils.getSampleAlignedChunkSize( + this.wavHeader, + segmentSize, + availableBytes, + this.streamComplete + ); if (alignedSize <= 0) return null; + const segmentOffset = this.processedBytes; console.log(`\n--- Decoding segment ---`); console.log(`Available: ${availableBytes} bytes, aligned size: ${alignedSize} bytes`); @@ -106,10 +200,13 @@ export class StreamDecoder { try { const buffer = await this.decodeWithTimeout(wavFile); + // Advance only after a successful decode so that a timeout or decode + // failure does not permanently skip the segment. + this.processedBytes += alignedSize; console.log(`✓ Decoded: ${buffer.duration.toFixed(3)}s, ${buffer.numberOfChannels}ch`); return { buffer, duration: buffer.duration }; } catch (error) { - console.error('Failed to decode segment:', error); + console.error(`Failed to decode segment at offset ${segmentOffset}:`, error); return null; } } @@ -145,7 +242,6 @@ export class StreamDecoder { currentPos += chunk.length; } - this.processedBytes += size; return extracted; } @@ -199,7 +295,7 @@ export class StreamDecoder { * Check if all stream data has been received */ get isComplete(): boolean { - return this.totalStreamLength > 0 && this.totalRawBytes >= (this.totalStreamLength - (this.wavHeader?.headerSize ?? 0)); + return this.streamComplete; } /** @@ -221,6 +317,33 @@ export class StreamDecoder { return Math.floor(rawOffset / this.wavHeader.blockAlign) * this.wavHeader.blockAlign; } + /** + * Explicitly mark the stream as complete. + * + * Called by the C# streaming loop after ReadAsync returns 0 (no more data). + * This ensures streamComplete is set even when the server omits Content-Length, + * which prevents updateStreamCompleteFlag from ever firing via byte counting. + * Returns all remaining decoded segments (the tail drain pass). + * + * If streamComplete was already true (set by updateStreamCompleteFlag during the + * final processChunk call), the tail was already drained inside that call's + * while(true) loop — return immediately to avoid a second drain pass that would + * set streamingCompleted = true even if the first drain had a partial failure. + */ + async markStreamComplete(): Promise { + if (this.streamComplete) { + return []; + } + this.streamComplete = true; + const results: DecodedChunkResult[] = []; + while (true) { + const segment = await this.tryDecodeNextSegment(); + if (!segment) break; + results.push(segment); + } + return results; + } + /** * Reset decoder state */ @@ -229,8 +352,11 @@ export class StreamDecoder { this.rawChunks = []; this.totalRawBytes = 0; this.processedBytes = 0; - this.isFirstChunk = true; this.totalStreamLength = 0; + this.streamComplete = false; + this.headerBytesReceived = 0; + this.headerSearchChunks = []; + this.headerError = null; } /** @@ -242,8 +368,11 @@ export class StreamDecoder { this.rawChunks = []; this.totalRawBytes = 0; this.processedBytes = 0; - this.isFirstChunk = true; this.totalStreamLength = totalStreamLength; + this.streamComplete = false; + this.headerBytesReceived = 0; + this.headerSearchChunks = []; + this.headerError = null; // wavHeader will be reparsed from the new stream (server sends fresh header) this.wavHeader = null; console.log(`StreamDecoder reinitialized for offset: expecting ${totalStreamLength} bytes`); diff --git a/DeepDrftWeb/Interop/audio/index.ts b/DeepDrftWeb/Interop/audio/index.ts index 164dcc2..48b9839 100644 --- a/DeepDrftWeb/Interop/audio/index.ts +++ b/DeepDrftWeb/Interop/audio/index.ts @@ -45,6 +45,12 @@ const DeepDrftAudio = { return player.startStreamingPlayback(); }, + markStreamComplete: async (playerId: string): Promise => { + const player = audioPlayers.get(playerId); + if (!player) return { success: false, error: 'Player not found' }; + return player.markStreamComplete(); + }, + ensureAudioContextReady: async (playerId: string): Promise => { const player = audioPlayers.get(playerId); if (!player) return { success: false, error: 'Player not found' }; diff --git a/DeepDrftWeb/Interop/wavutils.ts b/DeepDrftWeb/Interop/wavutils.ts index 7ff70fa..d0c6ac4 100644 --- a/DeepDrftWeb/Interop/wavutils.ts +++ b/DeepDrftWeb/Interop/wavutils.ts @@ -173,17 +173,26 @@ class WavUtils { buffer[43] = (audioDataSize >> 24) & 0xFF; } - static getSampleAlignedChunkSize(header: WavHeader, maxChunkSize: number, availableDataSize: number): number { + static getSampleAlignedChunkSize(header: WavHeader, maxChunkSize: number, availableDataSize: number, streamComplete: boolean = false): number { const frameSize = header.blockAlign; - - // Much smaller minimum for streaming - just enough for Web Audio API + + // Much smaller minimum for streaming - just enough for Web Audio API. + // The minimum exists to avoid decoding partial-frame artifacts on + // mid-stream chunks while the rest is still in flight. Once the stream + // is fully received, we must drain whatever remains regardless of size, + // otherwise the trailing tail (often <512 bytes) is silently lost. const minAudioBytes = Math.max(512, frameSize * 10); // At least 512 bytes or 10 frames - - // If we don't have enough data, return 0 to wait for more - if (availableDataSize < minAudioBytes) { + + // Mid-stream guard: wait for more data if below minimum. + if (!streamComplete && availableDataSize < minAudioBytes) { return 0; } - + + // Even when complete we still need at least one full frame to decode. + if (availableDataSize < frameSize) { + return 0; + } + // Calculate frames for the available data const requestedSize = Math.min(maxChunkSize, availableDataSize); const frames = Math.floor(requestedSize / frameSize);