diff --git a/DeepDrftPublic.Client/Clients/TrackMediaClient.cs b/DeepDrftPublic.Client/Clients/TrackMediaClient.cs index fa65636..191cd2e 100644 --- a/DeepDrftPublic.Client/Clients/TrackMediaClient.cs +++ b/DeepDrftPublic.Client/Clients/TrackMediaClient.cs @@ -20,13 +20,24 @@ public class TrackMediaResponse : IDisposable /// public string ContentType { get; } + /// + /// The total file length in bytes, parsed from the 206 response's Content-Range: + /// bytes start-end/TOTAL header (Phase 21 Direction B). Null when the server returned + /// 200 (no Content-Range) — callers fall back to as the total. + /// This is the EOF boundary the segment loop advances its cursor toward, and the full + /// logical length the JS decoder must see (so a bounded segment's small Content-Length + /// never trips the decoder's byte-count completion early). + /// + public long? TotalLength { get; } + private readonly HttpResponseMessage _response; - public TrackMediaResponse(Stream stream, long contentLength, string contentType, HttpResponseMessage response) + public TrackMediaResponse(Stream stream, long contentLength, string contentType, long? totalLength, HttpResponseMessage response) { Stream = stream; ContentLength = contentLength; ContentType = contentType; + TotalLength = totalLength; _response = response; } @@ -54,6 +65,15 @@ public class TrackMediaClient /// token aborts the in-flight server connection rather than leaving the server /// draining bytes into a dead socket. /// + /// (Phase 21 Direction B) bounds the request to a single + /// segment: when set, the Range header is bytes={byteOffset}-{byteEnd} (inclusive), + /// so the browser holds at most ~one segment of raw bytes regardless of file size — the + /// network-memory bound this phase exists for. When null the request is open-ended + /// (bytes={byteOffset}-), the pre-Direction-B behaviour. Either way the response's + /// Content-Range total is surfaced via + /// so the caller knows the EOF boundary and the full logical length the decoder must see. + /// + /// /// selects the delivery rendering (Phase 18): the default /// sends no format query param, so existing /// callers hit the byte-identical pre-Phase-18 endpoint; @@ -65,12 +85,13 @@ public class TrackMediaClient public async Task> GetTrackMedia( string trackId, long byteOffset = 0, + long? byteEnd = null, AudioFormat format = AudioFormat.Lossless, CancellationToken cancellationToken = default) { try { - // Same URL for every seek — only the Range header differs. byteOffset 0 is + // Same URL for every fetch — only the Range header differs. byteOffset 0 is // not special-cased: "bytes=0-" requests the whole file from the start. // Lossless omits the format param entirely so the request is byte-identical to // the pre-Phase-18 endpoint; only Opus appends ?format=opus. @@ -78,18 +99,19 @@ public class TrackMediaClient ? $"api/track/{trackId}" : $"api/track/{trackId}?format={format.ToString().ToLowerInvariant()}"; using var request = new HttpRequestMessage(HttpMethod.Get, uri); - request.Headers.Range = new RangeHeaderValue(byteOffset, null); + // Bounded (byteEnd set) → "bytes=start-end" so the server returns a finite 206 + // slice and the browser buffers only that segment; open-ended (byteEnd null) → + // "bytes=start-". The server honours both via File(..., enableRangeProcessing: true), + // which parses the full RFC 7233 range grammar and slices accordingly. + request.Headers.Range = new RangeHeaderValue(byteOffset, byteEnd); // Stream the response body incrementally instead of buffering it whole (Phase 21.4 fix). // In Blazor WebAssembly the HttpClient is backed by the browser fetch API; without this the - // browser buffers the ENTIRE body before the response stream yields a byte, so the 21.2 - // read-loop pause (StreamingAudioPlayerService) backpressures nothing — the whole payload is - // already in memory. Enabling streaming makes ReadAsync pull from a browser ReadableStream - // whose backpressure reaches the underlying fetch, so pausing reads genuinely throttles the - // network. This is a request-option flag, not a runtime call: on the SSR server-to-server path - // the SocketsHttpHandler simply ignores the unknown option, so it is safe unguarded. Applies to - // BOTH the initial stream (byteOffset 0) and the seek/refill Range requests (21.3) — both share - // this method, so both depend on the same backpressure. + // browser buffers the ENTIRE body before the response stream yields a byte. With Direction B + // each request is already bounded to one segment, so the body is small regardless — but + // streaming still lets us read it incrementally and is harmless on the SSR server-to-server + // path (SocketsHttpHandler ignores the unknown option). Kept for both the initial and the + // seek/refill paths since both share this method. request.SetBrowserResponseStreamingEnabled(true); // Use HttpCompletionOption.ResponseHeadersRead to get stream immediately @@ -100,11 +122,15 @@ public class TrackMediaClient // Default to WAV when the server omits the header — the only format shipping // today — so the JS factory always receives a usable media type. var contentType = response.Content.Headers.ContentType?.MediaType ?? "audio/wav"; + // Content-Range "bytes start-end/TOTAL" carries the full file length on a 206; on a 200 + // there is no Content-Range, so TotalLength is null and callers use ContentLength. + var totalLength = response.Content.Headers.ContentRange?.Length; var stream = await response.Content.ReadAsStreamAsync(cancellationToken); // TrackMediaResponse takes ownership of both stream and response; // do NOT dispose response here — the caller disposes via TrackMediaResponse.Dispose(). - return ApiResult.CreatePassResult(new TrackMediaResponse(stream, contentLength, contentType, response)); + return ApiResult.CreatePassResult( + new TrackMediaResponse(stream, contentLength, contentType, totalLength, response)); } catch (Exception e) { diff --git a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs index 3996dd0..5fece45 100644 --- a/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs +++ b/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs @@ -18,12 +18,23 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS private const int MaxBufferSize = 64 * 1024; // 64KB maximum // Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water - // mark, the read loop stops calling ReadAsync and polls IsProductionPaused at this cadence - // until the fill drains below low-water. 100 ms is well under the low-water lookahead (seconds), - // so resume is prompt relative to the playhead — no starvation (AC3) — while keeping the poll - // cheap. The poll honors the loop's cancellation token, so a track switch/seek during a pause - // exits through the same drain discipline as a pause during ReadAsync (C6). + // mark, the segment loop stops fetching the next segment and polls IsProductionPaused at this + // cadence until the fill drains below low-water. 100 ms is well under the low-water lookahead + // (seconds), so resume is prompt relative to the playhead — no starvation (AC3) — while keeping + // the poll cheap. The poll honors the loop's cancellation token, so a track switch/seek during a + // pause exits through the same drain discipline as a pause during ReadAsync (C6). private const int BackpressurePollMs = 100; + + // Phase 21 Direction B — forward Range-segment size. The forward stream is fetched as a + // sequence of bounded "bytes=cursor-(cursor+SegmentSizeBytes-1)" 206 requests, the next issued + // only when the scheduler drains below low-water. Because each request is bounded and fully + // consumed before the next is issued, the browser fetch holds AT MOST ~one segment of raw bytes + // regardless of file size — this is the network-memory bound the phase exists for (the open-ended + // single GET buffered the whole ~970 MB body in the browser even when reads were paused, the + // 21.4 finding). 4 MB balances request overhead (a 1 GB mix is ~250 segments) against memory: + // at the 30 s high-water mark a fast connection holds well under a segment of unplayed raw bytes, + // so the bound is the segment size, not the decoded window. Tunable; not magic. + private const long SegmentSizeBytes = 4 * 1024 * 1024; private int _currentBufferSize = DefaultBufferSize; private int _consecutiveSlowReads = 0; @@ -35,14 +46,6 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS public int BufferedChunks { get; private set; } = 0; public bool IsSeekingBeyondBuffer { get; private set; } = false; - // ─────────────────────────────────────────────────────────────────────────────────────────── - // [BP-DIAG] Phase 21.4 back-pressure diagnostic. TEMPORARY — strip once the cause is confirmed - // in Daniel's browser run. Logs every Nth chunk's ProductionPaused flag plus pause-poll - // enter/exit so a grep for "[BP-DIAG]" in the WASM console tells whether the read loop ever sees - // the pause signal and whether the poll actually holds. Throttled by chunk count to avoid flooding. - private const int BpDiagChunkLogEvery = 16; - // ─────────────────────────────────────────────────────────────────────────────────────────── - private bool _streamingPlaybackStarted = false; private CancellationTokenSource? _streamingCancellation; private Task? _activeStreamingTask; @@ -203,23 +206,29 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS // seek-beyond-buffer re-fetch reuses the same artifact. _currentFormat = await ResolveStreamFormatAsync(track.EntryKey, loadCts.Token); - // Pass the streaming token to the HTTP layer so a navigation/track switch - // aborts the server connection instead of leaving it draining bytes. - var mediaResult = await _trackMediaClient.GetTrackMedia( + // Direction B: fetch the FIRST bounded segment to learn the total file length and the + // content type. The 206 Content-Range carries the total; the segment loop advances its + // cursor toward it. The decoder is initialized with the TOTAL length (not the segment + // length) so a bounded segment's small Content-Length never trips its byte-count + // completion early — segment boundaries are invisible to the decoder, which sees one + // continuous in-order byte stream. Passing the streaming token aborts the server + // connection on a navigation/track switch instead of leaving it draining bytes. + var firstSegment = await _trackMediaClient.GetTrackMedia( track.EntryKey, byteOffset: 0, + byteEnd: SegmentSizeBytes - 1, format: _currentFormat, cancellationToken: loadCts.Token); - if (!mediaResult.Success) + if (!firstSegment.Success) { - var technicalError = mediaResult.GetMessage(); + var technicalError = firstSegment.GetMessage(); _logger.LogError("Failed to get track media for {TrackId}: {Error}", track.EntryKey, technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } - if (mediaResult.Value == null) + if (firstSegment.Value == null) { const string technicalError = "No audio returned from server"; _logger.LogError("No audio data returned for track {TrackId}", track.EntryKey); @@ -227,13 +236,22 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS return; } - using var audio = mediaResult.Value; + // Ownership of the first segment transfers to the segment loop, which disposes it (and + // every subsequent segment). No `using` here — a double dispose is avoided and the socket + // is released the moment the loop finishes consuming the segment. + var audio = firstSegment.Value; - // Initialize streaming mode with content length and media type (drives - // JS format-decoder selection). - var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, audio.ContentLength, audio.ContentType); + // The total logical length the decoder must see. On a 206 the Content-Range carries it; + // a 200 (server ignored Range / file ≤ one segment) has no Content-Range, so fall back to + // the body's own Content-Length — that body IS the whole file in that case. + var totalLength = audio.TotalLength ?? audio.ContentLength; + + // Initialize streaming mode with the TOTAL length and media type (drives JS + // format-decoder selection). See above: total, not segment, length. + var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, totalLength, audio.ContentType); if (!streamingResult.Success) { + audio.Dispose(); var technicalError = $"Failed to initialize streaming: {streamingResult.Error}"; _logger.LogError("Streaming initialization failed for track {TrackId}: {Error}", track.EntryKey, technicalError); @@ -241,7 +259,10 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS return; } - _activeStreamingTask = StreamAudioWithEarlyPlayback(audio, loadCts.Token); + // Forward segmentation from byte 0. The first segment is already in hand; the loop pumps + // it, then fetches subsequent bounded segments gated on the scheduler fill signal. + _activeStreamingTask = RunSegmentedStreamAsync( + track.EntryKey, audio, cursor: 0, totalLength, seekPosition: null, loadCts.Token); await _activeStreamingTask; } catch (OperationCanceledException) when (loadCts.IsCancellationRequested) @@ -263,10 +284,33 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS catch (Exception ex) { StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey); - ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); - LoadProgress = 0; - IsLoaded = false; - IsStreamingMode = false; + var userError = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); + + // Mid-stream failure (playback was already underway): halt the JS scheduler into a clean + // paused-but-loaded state exactly as the seek path does via RecoverFromFailedRefill, rather + // than resetting to unloaded and letting the scheduler's buffered tail drain into a silent + // false end (AC6). Apply only when this load is still the active operation — a superseding + // seek owns state and has already replaced _streamingCancellation with its own CTS. + if (_streamingPlaybackStarted && ReferenceEquals(_streamingCancellation, loadCts)) + { + await RecoverFromFailedRefill(CurrentTime, userError); + } + else if (ReferenceEquals(_streamingCancellation, loadCts)) + { + // First-segment failure (nothing buffered / playing yet), still the active operation: + // the normal unload-to-error path is correct — nothing is in the scheduler to halt. + ErrorMessage = userError; + LoadProgress = 0; + IsLoaded = false; + IsStreamingMode = false; + } + else + { + // Superseded load: a newer seek (or track switch) has already claimed _streamingCancellation + // and owns all shared state. Writing IsLoaded/IsStreamingMode here would corrupt the live + // operation — mirror the OCE catch's identity guard and do nothing to shared state. + _logger.LogDebug("Generic throw on superseded load for track {TrackId} — newer operation owns state, skipping unload", track.EntryKey); + } } finally { @@ -385,176 +429,223 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS return profile; } - private async Task StreamAudioWithEarlyPlayback(TrackMediaResponse audio, CancellationToken cancellationToken) + /// + /// Phase 21 Direction B — the single segmented forward read loop, shared by the initial load and + /// the seek/refill path (the convergence C1/C5 require: one cursor, one fetch mechanism, no forked + /// path). It pumps the FIRST segment (already fetched by the caller), then fetches subsequent + /// bounded bytes=cursor-(cursor+SegmentSizeBytes-1) 206 segments — each only AFTER the + /// scheduler drains below low-water — until the cursor reaches . + /// Because each segment is bounded and fully consumed before the next is requested, the browser + /// holds at most ~one segment of raw bytes (the network-memory bound), while the decoder sees one + /// continuous in-order byte stream across segment boundaries (the demuxer/decoder buffer partial + /// frames/pages across the boundary exactly as for arbitrary chunks today — no per-segment reinit). + /// + /// The already-fetched first segment (byte ). + /// Owned by this method, which disposes it; subsequent segments are fetched and disposed inline. + /// File-absolute byte offset the first segment starts at (0 for a fresh load, + /// the resolved seek offset for a refill). + /// Total file length in bytes — the EOF boundary the cursor advances + /// toward. The decoder is initialized/reinitialized against this, not the per-segment length. + /// Non-null for a seek/refill: the decoder is reinitialized for the + /// header-less Range continuation at this time before the first segment's bytes are fed (WAV + /// retains its header, Opus re-applies the cached setup + lead-trim). Null for a forward load from + /// byte 0, where the first segment carries the header and no reinit is needed. + private async Task RunSegmentedStreamAsync( + string trackId, + TrackMediaResponse firstSegment, + long cursor, + long totalLength, + double? seekPosition, + CancellationToken cancellationToken) { byte[]? buffer = null; + var segment = firstSegment; try { - long totalBytesRead = 0; - buffer = ArrayPool.Shared.Rent(MaxBufferSize); // Rent larger buffer to accommodate adaptive sizing - int currentBytes; - var readTimer = System.Diagnostics.Stopwatch.StartNew(); - var bpDiagChunkIndex = 0; // [BP-DIAG] per-stream chunk counter for throttled logging - - do + // Seek/refill: reinitialize the active decoder for the header-less continuation ONCE, + // before any continuation bytes are fed. Forward-from-zero (seekPosition null) skips this + // — its first segment carries the real header the decoder parses. Done here, inside the + // single loop, so seek and forward share the same fetch+pump mechanism (no forked path). + if (seekPosition is { } resumeAt) { - readTimer.Restart(); - currentBytes = await audio.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken); - readTimer.Stop(); - - // Adapt buffer size based on read performance - AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds); - - if (currentBytes > 0) + // The decoder byte-counts the header-less continuation against the bytes REMAINING + // from the range start to EOF (total − cursor), not the absolute total — that is what + // reinitializeForRangeContinuation expects (StreamDecoder.remainingByteLength). The + // loop's own cursor still targets the absolute totalLength for EOF. + var remainingBytes = Math.Max(0, totalLength - cursor); + var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, remainingBytes, resumeAt); + if (!reinitResult.Success) { - totalBytesRead += currentBytes; - - // Always slice to the exact number of bytes read. The pooled buffer - // is rented at MaxBufferSize and may carry stale bytes past - // currentBytes from a prior rental — handing the full array to JS - // interop would serialise that garbage into the audio stream. - var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray(); - - // Process chunk for streaming - var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer); - if (!chunkResult.Success) - { - var error = $"Failed to process streaming chunk: {chunkResult.Error}"; - _logger.LogWarning("Chunk processing failed: {Error}", error); - throw new Exception(error); - } - - // Update streaming state - CanStartStreaming = chunkResult.CanStartStreaming; - HeaderParsed = chunkResult.HeaderParsed; - BufferedChunks = chunkResult.BufferCount; - - // [BP-DIAG] Phase 21.4 — throttled per-chunk view of the back-pressure signal as - // the C# loop sees it. If ProductionPaused never logs True while bytes keep - // flowing, the break is upstream (JS latch / lookahead math); if it logs True but - // the transfer still races to 100%, the break is the transport (browser buffered - // the whole body, SetBrowserResponseStreamingEnabled not in effect). TEMPORARY. - if (bpDiagChunkIndex % BpDiagChunkLogEvery == 0) - { - _logger.LogInformation( - "[BP-DIAG] chunk #{Chunk} bytesRead={Bytes} totalRead={Total} bufferCount={BufCount} canStart={CanStart} productionPaused={Paused} isPaused={IsPaused}", - bpDiagChunkIndex, currentBytes, totalBytesRead, chunkResult.BufferCount, - chunkResult.CanStartStreaming, chunkResult.ProductionPaused, IsPaused); - } - bpDiagChunkIndex++; - - // Set duration from WAV header when available (only set once) - if (chunkResult.Duration.HasValue && Duration == null) - { - Duration = chunkResult.Duration.Value; - _logger.LogInformation("Duration set from WAV header: {Duration:F2} seconds", Duration); - // Feed the same once-only duration to the play session so it can compute the - // completion fraction at close. Safe before/after session open — SetDuration - // is a no-op when no session is open and idempotent otherwise. - _playTracker?.SetDuration(chunkResult.Duration.Value); - } - - // Start playback as soon as we can - if (!_streamingPlaybackStarted && CanStartStreaming) - { - var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId); - if (playbackResult.Success) - { - _streamingPlaybackStarted = true; - IsPlaying = true; - IsPaused = false; - IsLoaded = true; // Track is loaded and ready to play (even if still downloading) - ErrorMessage = null; - - // Open the play session exactly once per load, at the moment playback truly - // begins (§2.1). The _sessionOpened guard keeps the SeekBeyondBuffer re-stream - // — which re-enters this transition with _streamingPlaybackStarted reset — - // from opening a second session for the same play. Duration may already be - // known from a prior chunk, so re-feed it after opening. - if (!_sessionOpened && _currentTrackId is { } trackKey) - { - _sessionOpened = true; - _playTracker?.OnPlaybackStarted(trackKey); - if (Duration is { } d) - _playTracker?.SetDuration(d); - } - - await NotifyStateChanged(); // Immediate notification for critical state change - } - else - { - var technicalError = $"Failed to start streaming playback: {playbackResult.Error}"; - _logger.LogError("Failed to start playback: {Error}", technicalError); - ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); - } - } - - // Update progress - if (audio.ContentLength > 0) - { - LoadProgress = Math.Min(1.0, (double)totalBytesRead / audio.ContentLength); - } - - await ThrottledNotifyStateChanged(); - - // Phase 21.2a back-pressure (serves BOTH paths). The chunk we just processed - // reported the scheduler's forward fill is over the high-water mark — stop - // reading the socket so the unplayed decoded region stays bounded. Pausing - // ReadAsync lets the kernel TCP window close (we are working WITH transport flow - // control, not against it). Poll until the fill drains below low-water, then - // resume the loop. For WAV this is the whole story (StreamDecoder decodes - // synchronously into the scheduler); the Opus feed additionally self-throttles - // its demux/decode off the SAME signal (21.2b), so its upstream queues stay - // near-empty too. The poll awaits on cancellationToken, so a track switch/seek - // mid-pause throws OCE and unwinds through the existing drain discipline (C6) — - // no separate cancellation path, no stale read racing a reinit. - if (chunkResult.ProductionPaused) - { - // [BP-DIAG] Phase 21.4 — the read loop is ENTERING the pause-poll: reads have - // stopped, the socket should now stall and the transfer should plateau. If you - // see this line but the network transfer still completes, the transport is - // buffered (streaming flag not in effect). TEMPORARY. - _logger.LogInformation( - "[BP-DIAG] ENTER pause-poll at chunk #{Chunk} totalRead={Total} isPaused={IsPaused}", - bpDiagChunkIndex, totalBytesRead, IsPaused); - var bpDiagPollCount = 0; - - // UC5: while the user is paused, the playhead is frozen so forward lookahead - // never shrinks and the poll would spin indefinitely. Wait here until playback - // resumes (IsPaused clears) OR the fill drains on its own. Cancellation is - // unchanged: a track switch/seek/stop while paused still throws OCE and unwinds - // through the existing drain discipline (C6) — no weakening of the cancel path. - while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId)) - { - cancellationToken.ThrowIfCancellationRequested(); - // [BP-DIAG] Phase 21.4 — heartbeat every ~1 s (10 × 100 ms) so a held poll - // is visible without flooding; shows the loop is genuinely parked. TEMPORARY. - if (bpDiagPollCount % 10 == 0) - { - _logger.LogInformation( - "[BP-DIAG] HOLD pause-poll iter={Iter} isPaused={IsPaused}", - bpDiagPollCount, IsPaused); - } - bpDiagPollCount++; - await Task.Delay(BackpressurePollMs, cancellationToken); - } - - // [BP-DIAG] Phase 21.4 — the read loop is EXITING the pause-poll and resuming - // ReadAsync: the fill drained below low-water. TEMPORARY. - _logger.LogInformation( - "[BP-DIAG] EXIT pause-poll at chunk #{Chunk} after {Iters} polls", - bpDiagChunkIndex, bpDiagPollCount); - } + throw new Exception($"Failed to reinitialize for offset streaming: {reinitResult.Error}"); } - } while (currentBytes > 0); + } - // Notify the JS decoder that the stream is finished. When the server omits - // Content-Length the StreamDecoder cannot determine completion via byte counting - // alone; this explicit signal ensures the tail-decoding path (streamComplete=true) - // fires regardless of whether Content-Length was present. + buffer = ArrayPool.Shared.Rent(MaxBufferSize); // larger rental to fit adaptive sizing + var readTimer = System.Diagnostics.Stopwatch.StartNew(); + + // Segment loop. Each iteration fully consumes one bounded 206 body, advancing the cursor by + // the bytes received. The next segment is fetched only when the scheduler is below + // high-water (the inter-segment gate). EOF is the cursor reaching totalLength, or a short + // segment (server returned fewer bytes than requested — the final slice). + while (true) + { + long segmentBytesRead = 0; + int currentBytes; + do + { + readTimer.Restart(); + currentBytes = await segment.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken); + readTimer.Stop(); + + AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds); + + if (currentBytes > 0) + { + segmentBytesRead += currentBytes; + + // Slice to the exact bytes read: the pooled buffer is rented at MaxBufferSize + // and may carry stale bytes past currentBytes from a prior rental — handing the + // full array to JS would serialise that garbage into the audio stream. + var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray(); + + var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer); + if (!chunkResult.Success) + { + var error = $"Failed to process streaming chunk: {chunkResult.Error}"; + _logger.LogWarning("Chunk processing failed: {Error}", error); + throw new Exception(error); + } + + CanStartStreaming = chunkResult.CanStartStreaming; + HeaderParsed = chunkResult.HeaderParsed; + BufferedChunks = chunkResult.BufferCount; + // chunkResult.ProductionPaused is informational only on this path — back-pressure + // granularity is one segment (the inter-segment gate below), not per-chunk. + + // Set duration from header when available (only set once) + if (chunkResult.Duration.HasValue && Duration == null) + { + Duration = chunkResult.Duration.Value; + _logger.LogInformation("Duration set from header: {Duration:F2} seconds", Duration); + // Feed the once-only duration to the play session for the completion + // fraction. No-op when no session is open; idempotent otherwise. + _playTracker?.SetDuration(chunkResult.Duration.Value); + } + + // Start playback as soon as we can — at the min-buffer threshold, exactly as + // before (C2: first audio is not gated on the segment boundary; the first + // segment alone clears the threshold). + if (!_streamingPlaybackStarted && CanStartStreaming) + { + var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId); + if (playbackResult.Success) + { + _streamingPlaybackStarted = true; + IsPlaying = true; + IsPaused = false; + IsLoaded = true; // loaded and ready, even while still downloading + ErrorMessage = null; + + // Open the play session exactly once per load, at the moment playback + // truly begins (§2.1). The _sessionOpened guard keeps a seek/refill + // re-stream — which re-enters this transition with + // _streamingPlaybackStarted reset — from opening a second session for + // the same play. Duration may already be known, so re-feed it. + if (!_sessionOpened && _currentTrackId is { } trackKey) + { + _sessionOpened = true; + _playTracker?.OnPlaybackStarted(trackKey); + if (Duration is { } d) + _playTracker?.SetDuration(d); + } + + await NotifyStateChanged(); // immediate — critical state change + } + else + { + var technicalError = $"Failed to start streaming playback: {playbackResult.Error}"; + _logger.LogError("Failed to start playback: {Error}", technicalError); + ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); + } + } + + // Progress against the total file length (cursor + bytes consumed so far). + if (totalLength > 0) + { + LoadProgress = Math.Min(1.0, (double)(cursor + segmentBytesRead) / totalLength); + } + + await ThrottledNotifyStateChanged(); + } + } while (currentBytes > 0); + + // Segment fully consumed; advance the cursor and release this segment's stream/socket + // before deciding whether to fetch the next. Disposing here keeps exactly one segment's + // raw bytes resident at a time. + cursor += segmentBytesRead; + segment.Dispose(); + segment = null!; + + // EOF: cursor reached the total file length. This is the sole forward-EOF condition. + // A short segment body (segmentBytesRead < SegmentSizeBytes) is NOT an EOF signal — + // the inner read loop fully drains the HTTP body, so a short body means the server + // sent fewer bytes than the bounded range we requested. While cursor < totalLength that + // can only be a connection drop / truncated stream, NOT the file tail — route it to + // the same clean-failure recovery as a fetch error rather than silently completing. + var reachedTotal = totalLength > 0 && cursor >= totalLength; + if (reachedTotal) + { + break; + } + + // Guard: if the body was short but we haven't reached totalLength, the stream was + // truncated mid-segment (connection drop / premature close). Surface as an error so + // the scheduler is halted rather than left to drain its buffered tail into a false end. + if (segmentBytesRead < SegmentSizeBytes) + { + throw new Exception( + $"Stream truncated at byte {cursor} of {totalLength}: received {segmentBytesRead} bytes " + + $"but expected up to {SegmentSizeBytes} and have not reached EOF"); + } + + // Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH + // instead of pacing ReadAsync on an open stream). Do not fetch the next segment while + // the scheduler is over high-water; wait until it drains below low-water. Because the + // browser only buffers bounded segments and we hold off requesting the next one, raw + // network memory stays at ~one segment. The poll awaits on cancellationToken, so a + // track switch/seek mid-wait throws OCE and unwinds through the existing drain + // discipline (C6). UC5: a user pause freezes the playhead so the fill never drains — + // hold here until playback resumes (IsPaused clears) OR the fill drains on its own. + while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId)) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(BackpressurePollMs, cancellationToken); + } + + // Fetch the next bounded segment. The end offset is clamped implicitly by the server + // (a request past EOF yields the available tail as a short slice, caught above). + var nextEnd = cursor + SegmentSizeBytes - 1; + var nextResult = await _trackMediaClient.GetTrackMedia( + trackId, + byteOffset: cursor, + byteEnd: nextEnd, + format: _currentFormat, + cancellationToken: cancellationToken); + if (!nextResult.Success || nextResult.Value == null) + { + var technicalError = nextResult.GetMessage() ?? "Failed to fetch next stream segment"; + _logger.LogError("Failed to fetch segment at offset {Offset} for {TrackId}: {Error}", + cursor, trackId, technicalError); + throw new Exception(technicalError); + } + segment = nextResult.Value; + } + + // Notify the JS decoder that the stream is finished. The decoder marks completion by byte + // count against the total it was initialized with; this explicit signal flushes the + // residual tail and covers the (rare) case where the total was unknown. await _audioInterop.MarkStreamCompleteAsync(PlayerId); - // Mark as fully loaded LoadProgress = 1.0; await NotifyStateChanged(); } @@ -565,7 +656,7 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS } catch (Exception ex) { - StreamingErrorHandler.LogError(_logger, ex, "StreamAudioWithEarlyPlayback"); + StreamingErrorHandler.LogError(_logger, ex, "RunSegmentedStreamAsync"); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); LoadProgress = 0; IsLoaded = false; @@ -575,6 +666,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS } finally { + // Release the last segment (if a fetch failed mid-loop it may still be held) and the buffer. + segment?.Dispose(); if (buffer != null) { ArrayPool.Shared.Return(buffer); @@ -653,6 +746,10 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS return; } + // Capture into a non-null local: _currentTrackId is the field a track-switch could clear, but + // this seek operates against the track loaded NOW; the segment loop needs a stable id. + var trackId = _currentTrackId; + IsSeekingBeyondBuffer = true; // Cancel the current streaming loop AND wait for it to fully exit before @@ -691,19 +788,22 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS CurrentTime = seekPosition; await NotifyStateChanged(); - // Request new stream from offset. Reuse the format the initial load resolved to (_currentFormat): - // an Opus seek must come back as Opus bytes so the cached setup header + page-aligned byteOffset - // (resolved by the JS decoder's index-based calculateByteOffset) match the continuation. The - // offset itself is computed JS-side from the Opus seek index for Opus, exactly as it is from the - // WAV header for lossless — one seam, format-appropriate math (AC9 / §3.4a C). - var mediaResult = await _trackMediaClient.GetTrackMedia( - _currentTrackId, + // Request the FIRST bounded segment from the resolved offset (Direction B — converged with + // the forward path). Reuse the format the initial load resolved to (_currentFormat): an + // Opus seek must come back as Opus bytes so the cached setup header + page-aligned + // byteOffset (resolved JS-side from the Opus seek index) match the continuation; WAV resolves + // its offset from the header — one seam, format-appropriate math (AC9 / §3.4a C). The + // segment loop then continues forward segmentation from this offset exactly as a fresh load + // does from 0 — no forked fetch path (C1/C5). + var firstSegment = await _trackMediaClient.GetTrackMedia( + trackId, byteOffset, + byteEnd: byteOffset + SegmentSizeBytes - 1, format: _currentFormat, cancellationToken: seekCts.Token); - if (!mediaResult.Success || mediaResult.Value == null) + if (!firstSegment.Success || firstSegment.Value == null) { - var technicalError = mediaResult.GetMessage() ?? "Failed to load audio from position"; + var technicalError = firstSegment.GetMessage() ?? "Failed to load audio from position"; _logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError); // Guard: a superseded seek must NOT touch shared state. The newer seek owns teardown. if (IsStillActiveSeek()) @@ -717,33 +817,24 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS return; } - using var audio = mediaResult.Value; + var audio = firstSegment.Value; + // The absolute EOF boundary the segment loop's cursor targets. On a 206 the Content-Range + // carries the file total; on a 200 (single-segment file) fall back to cursor + body length. + var totalLength = audio.TotalLength ?? (byteOffset + audio.ContentLength); - // Reinitialize JS player for offset streaming - var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, audio.ContentLength, seekPosition); - if (!reinitResult.Success) - { - _logger.LogError("Failed to reinitialize for offset streaming: {Error}", reinitResult.Error); - // Guard: same single-writer discipline — only recover when we are still the active seek. - if (IsStillActiveSeek()) - { - await RecoverFromFailedRefill(seekPosition, "Failed to seek to position"); - } - else - { - _logger.LogDebug("Reinit failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition); - } - return; - } - - // Reset streaming state for new stream + // Reset streaming state for the new stream. The decoder reinit for the header-less + // continuation happens INSIDE RunSegmentedStreamAsync (seekPosition non-null), so seek and + // forward share one fetch+pump+reinit mechanism. A reinit failure there throws and lands in + // the catch below, which recovers when still the active seek — the same clean-failure path + // (AC6) the old explicit reinit branch had, now unified with the fetch-failure path. _streamingPlaybackStarted = false; CanStartStreaming = false; HeaderParsed = false; BufferedChunks = 0; - // Stream audio from offset - _activeStreamingTask = StreamAudioWithEarlyPlayback(audio, seekCts.Token); + // Stream from offset via the shared segment loop. Ownership of `audio` transfers to it. + _activeStreamingTask = RunSegmentedStreamAsync( + trackId, audio, cursor: byteOffset, totalLength, seekPosition, seekCts.Token); await _activeStreamingTask; IsSeekingBeyondBuffer = false; @@ -795,10 +886,16 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS _logger.LogWarning("Refill-failure recovery interop did not succeed: {Error}", recovered.Error); } - // Settle C# into the matching recoverable state: not playing, paused at the target, still loaded. + // Settle C# into the matching recoverable state: not playing, paused at the target, still loaded + // and still in streaming mode. IsLoaded = true and IsStreamingMode = true are both load-bearing — + // the "paused-but-loaded" contract lets the listener retry the seek (Seek early-returns when + // !IsLoaded || !IsStreamingMode), resume via TogglePlayPause, or pick another track. Resetting + // either to false would wedge at least one of the three retry routes (AC6 / Phase 21.3). ErrorMessage = userFacingError; IsPlaying = false; IsPaused = true; + IsLoaded = true; + IsStreamingMode = true; CurrentTime = seekPosition; IsSeekingBeyondBuffer = false; await NotifyStateChanged(); diff --git a/DeepDrftPublic/Interop/audio/AudioPlayer.ts b/DeepDrftPublic/Interop/audio/AudioPlayer.ts index 6577294..d33572f 100644 --- a/DeepDrftPublic/Interop/audio/AudioPlayer.ts +++ b/DeepDrftPublic/Interop/audio/AudioPlayer.ts @@ -269,19 +269,13 @@ export class AudioPlayer { const headerParsed = decoder.ready; const canStart = headerParsed && this.scheduler.hasMinimumBuffers(this.minBuffersForPlayback); - // [BP-DIAG] Phase 21.4 — value of productionPaused actually placed on the Opus chunk - // result handed to C#. Confirms the flag is populated on THIS path (not just the WAV - // path). TEMPORARY — strip once confirmed. - const opusPaused = this.scheduler.evaluateProductionPause(); - this.bpDiagLogChunkResult('opus', canStart, opusPaused); - return { success: true, canStartStreaming: canStart, headerParsed, bufferCount: this.scheduler.getBufferCount(), duration: this.duration, - productionPaused: opusPaused + productionPaused: this.scheduler.evaluateProductionPause() }; } catch (error) { return { success: false, error: (error as Error).message }; @@ -320,18 +314,13 @@ export class AudioPlayer { const canStart = this.streamDecoder.headerParsed && this.scheduler.hasMinimumBuffers(this.minBuffersForPlayback); - // [BP-DIAG] Phase 21.4 — value of productionPaused actually placed on the WAV/MP3/FLAC - // chunk result handed to C#. TEMPORARY — strip once confirmed. - const formatPaused = this.scheduler.evaluateProductionPause(); - this.bpDiagLogChunkResult('format', canStart, formatPaused); - return { success: true, canStartStreaming: canStart, headerParsed: this.streamDecoder.headerParsed, bufferCount: this.scheduler.getBufferCount(), duration: this.duration, - productionPaused: formatPaused + productionPaused: this.scheduler.evaluateProductionPause() }; } catch (error) { return { success: false, error: (error as Error).message }; @@ -742,21 +731,6 @@ export class AudioPlayer { // ==================== Private Methods ==================== - // ───────────────────────────────────────────────────────────────────────────────────────── - // [BP-DIAG] Phase 21.4 back-pressure diagnostic. TEMPORARY — strip once confirmed in Daniel's - // browser run. Logs the productionPaused flag on the chunk result handed back to C#, throttled - // to ~4 Hz so it does not flood. Grep "[BP-DIAG] chunk-result" in the browser console. - private bpDiagChunkResultLastMs = 0; - private bpDiagLogChunkResult(path: 'opus' | 'format', canStart: boolean, paused: boolean): void { - const now = (typeof performance !== 'undefined' ? performance.now() : Date.now()); - if (now - this.bpDiagChunkResultLastMs < 250) return; - this.bpDiagChunkResultLastMs = now; - console.log( - `[BP-DIAG] chunk-result path=${path} productionPaused=${paused} canStart=${canStart} ` + - `bufCount=${this.scheduler.getBufferCount()} streamingStarted=${this.streamingStarted} isPlaying=${this.isPlaying}`); - } - // ───────────────────────────────────────────────────────────────────────────────────────── - private resetState(): void { this.isPlaying = false; this.isPaused = false; diff --git a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts index 341e0e3..5379130 100644 --- a/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts +++ b/DeepDrftPublic/Interop/audio/PlaybackScheduler.ts @@ -107,14 +107,6 @@ export class PlaybackScheduler { // Mutated by evaluateProductionPause() — named to signal the state-advance on each call. private productionPaused_: boolean = false; - // ───────────────────────────────────────────────────────────────────────────────────────── - // [BP-DIAG] Phase 21.4 back-pressure diagnostic. TEMPORARY — strip once the cause is confirmed - // in Daniel's browser run. Throttles evaluateProductionPause() logging to one line per ~250 ms - // so the console shows the live lookahead / byte-estimate / latch without flooding (the signal - // is evaluated on every chunk + every poll). Grep "[BP-DIAG]" in the browser console. - private bpDiagLastLogMs: number = 0; - // ───────────────────────────────────────────────────────────────────────────────────────── - // Callbacks public onPlaybackEnded: (() => void) | null = null; @@ -245,22 +237,6 @@ export class PlaybackScheduler { this.productionPaused_ = true; } - // [BP-DIAG] Phase 21.4 — the single source of truth for the latch decision. If `paused` - // never goes true while bytes keep arriving, the lookahead is not growing as expected: - // inspect `lookahead` vs `high` (should cross 30 during a fast fill) and `bufCount`/`bytes` - // (decode must actually be populating the scheduler). Throttled to ~4 Hz. TEMPORARY — strip - // once the cause is confirmed. (Uses performance.now when present; Date.now fallback.) - const bpNow = (typeof performance !== 'undefined' ? performance.now() : Date.now()); - if (bpNow - this.bpDiagLastLogMs >= 250) { - this.bpDiagLastLogMs = bpNow; - console.log( - `[BP-DIAG] evaluateProductionPause paused=${this.productionPaused_} ` + - `lookahead=${lookahead.toFixed(2)}s high=${this.forwardHighWaterSeconds} low=${this.forwardLowWaterSeconds} ` + - `bytes=${(this.getDecodedByteEstimate() / (1024 * 1024)).toFixed(1)}MB cap=${(this.maxDecodedBytes / (1024 * 1024)).toFixed(0)}MB ` + - `overByteCeiling=${overByteCeiling} bufCount=${this.buffers.length} pos=${this.getCurrentPosition().toFixed(2)}s ` + - `decodedEnd=${(this.getTotalDuration() + this.playbackOffset).toFixed(2)}s active=${this.isActive_}`); - } - return this.productionPaused_; } diff --git a/DeepDrftTests/SegmentedStreamLoopTests.cs b/DeepDrftTests/SegmentedStreamLoopTests.cs new file mode 100644 index 0000000..47978d6 --- /dev/null +++ b/DeepDrftTests/SegmentedStreamLoopTests.cs @@ -0,0 +1,427 @@ +using System.Collections.Concurrent; +using System.Net; +using System.Net.Http.Headers; +using DeepDrftModels.DTOs; +using DeepDrftPublic.Client.Clients; +using DeepDrftPublic.Client.Services; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.JSInterop; + +namespace DeepDrftTests; + +/// +/// Phase 21 Direction B — the segmented forward read loop in . +/// Drives a real SelectTrackStreaming against a fake JS runtime and a scripted HTTP handler that +/// serves bounded 206 segments, then asserts the loop's contract: +/// +/// forward playback fetches in bounded bytes=start-end segments (the network-memory bound); +/// the cursor advances contiguously across segment boundaries until the file total is reached (EOF); +/// the next segment is NOT fetched while the scheduler reports production paused (the fill gate); +/// a seek converges onto the SAME segment loop — reinit then continued segmentation, no forked path. +/// +/// True network/browser memory behaviour is Daniel's manual re-run; this pins the request sequencing and +/// gating the harness can observe. +/// +[TestFixture] +public class SegmentedStreamLoopTests +{ + private const long SegmentSize = 4 * 1024 * 1024; + + // Records every audio Range request and serves a bounded 206 slice. Audio bodies are zero-filled — + // the fake JS decoder does not inspect bytes; it scripts canStart/productionPaused directly. + private sealed class SegmentServer : HttpMessageHandler + { + private readonly long _total; + public List<(long From, long? To)> AudioRanges { get; } = new(); + + public SegmentServer(long total) => _total = total; + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + var path = request.RequestUri!.AbsolutePath; + + // Waveform profile + sidecar fetches are best-effort side calls — 404 them so the load + // path falls back cleanly and the test stays focused on the audio segment loop. + if (path.EndsWith("/waveform") || path.Contains("/opus/")) + { + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound)); + } + + var rangeItem = request.Headers.Range!.Ranges.First(); + var from = rangeItem.From ?? 0; + var to = rangeItem.To ?? (_total - 1); + if (to > _total - 1) to = _total - 1; + lock (AudioRanges) AudioRanges.Add((rangeItem.From ?? 0, rangeItem.To)); + + var body = new byte[Math.Max(0, to - from + 1)]; + var response = new HttpResponseMessage(HttpStatusCode.PartialContent) + { + Content = new ByteArrayContent(body), + }; + response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total); + response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav"); + return Task.FromResult(response); + } + } + + // Serves the first segment normally, then truncates subsequent segment bodies to a short slice + // (Content-Range reports the correct total, but the HTTP body ends early — simulating a + // connection drop mid-segment while cursor < totalLength). + private sealed class TruncatingAfterFirstSegmentServer : HttpMessageHandler + { + private readonly long _total; + private readonly long _truncatedBodyBytes; // bytes to actually deliver for non-first segments + private int _audioRequestCount; + + public TruncatingAfterFirstSegmentServer(long total, long truncatedBodyBytes) + { + _total = total; + _truncatedBodyBytes = truncatedBodyBytes; + } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + var path = request.RequestUri!.AbsolutePath; + if (path.EndsWith("/waveform") || path.Contains("/opus/")) + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound)); + + var rangeItem = request.Headers.Range!.Ranges.First(); + var from = rangeItem.From ?? 0; + var to = rangeItem.To ?? (_total - 1); + if (to > _total - 1) to = _total - 1; + + var requestIndex = Interlocked.Increment(ref _audioRequestCount); + // First segment (requestIndex == 1): serve fully. Subsequent segments: truncate. + var fullSliceLength = to - from + 1; + var bodyLength = requestIndex == 1 ? fullSliceLength : Math.Min(_truncatedBodyBytes, fullSliceLength); + var body = new byte[bodyLength]; + + var response = new HttpResponseMessage(HttpStatusCode.PartialContent) + { + Content = new ByteArrayContent(body), + }; + // Content-Range always reports the true full total — the truncation is in the body, not the header. + response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total); + response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav"); + return Task.FromResult(response); + } + } + + // Serves the first segment normally, then returns HTTP 500 for all subsequent requests — + // simulating a mid-stream fetch failure after playback is underway. + private sealed class FailingAfterFirstSegmentServer : HttpMessageHandler + { + private readonly long _total; + private int _audioRequestCount; + + public FailingAfterFirstSegmentServer(long total) => _total = total; + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + var path = request.RequestUri!.AbsolutePath; + if (path.EndsWith("/waveform") || path.Contains("/opus/")) + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound)); + + var requestIndex = Interlocked.Increment(ref _audioRequestCount); + if (requestIndex > 1) + return Task.FromResult(new HttpResponseMessage(HttpStatusCode.InternalServerError)); + + var rangeItem = request.Headers.Range!.Ranges.First(); + var from = rangeItem.From ?? 0; + var to = rangeItem.To ?? (_total - 1); + if (to > _total - 1) to = _total - 1; + + var body = new byte[to - from + 1]; + var response = new HttpResponseMessage(HttpStatusCode.PartialContent) + { + Content = new ByteArrayContent(body), + }; + response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total); + response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav"); + return Task.FromResult(response); + } + } + + private sealed class SingleClientFactory : IHttpClientFactory + { + private readonly HttpMessageHandler _handler; + public SingleClientFactory(HttpMessageHandler handler) => _handler = handler; + public HttpClient CreateClient(string name) => + new(_handler, disposeHandler: false) { BaseAddress = new Uri("https://content.test/") }; + } + + /// + /// Scriptable JS runtime. processStreamingChunk reports canStart=true immediately (so playback + /// starts on the first chunk) and a productionPaused value pulled from a queue the test controls; + /// isProductionPaused (the inter-segment poll) reads a separate queue so a test can hold the gate + /// closed for N polls then release it. Records reinitializeFromOffset / markStreamComplete calls. + /// + private sealed class FakeJsRuntime : IJSRuntime + { + private readonly Func _chunkProductionPaused; + private readonly Func _isProductionPaused; + private readonly long? _seekByteOffset; + + public FakeJsRuntime( + Func? chunkProductionPaused = null, + Func? isProductionPaused = null, + long? seekByteOffset = null) + { + _chunkProductionPaused = chunkProductionPaused ?? (() => false); + _isProductionPaused = isProductionPaused ?? (() => false); + _seekByteOffset = seekByteOffset; + } + + public int ReinitCallCount { get; private set; } + public int MarkCompleteCallCount { get; private set; } + public int IsProductionPausedCallCount { get; private set; } + public int RecoverCallCount { get; private set; } + + public ValueTask InvokeAsync(string identifier, object?[]? args) + { + switch (identifier) + { + case "DeepDrftAudio.isReady": + return Ok(true); + case "DeepDrftAudio.canDecodeOggOpus": + return Ok(false); // force the lossless path — no sidecar dance + case "DeepDrftAudio.isProductionPaused": + IsProductionPausedCallCount++; + return Ok(_isProductionPaused()); + case "DeepDrftAudio.processStreamingChunk": + return (ValueTask)(object)ValueTask.FromResult(new StreamingResult + { + Success = true, + CanStartStreaming = true, + HeaderParsed = true, + BufferCount = 8, + Duration = 600, + ProductionPaused = _chunkProductionPaused(), + }); + case "DeepDrftAudio.seek": + // When a seek offset is scripted, report seek-beyond-buffer so Seek() routes into + // SeekBeyondBuffer → the shared segment loop with a continuation reinit. + return (ValueTask)(object)ValueTask.FromResult(_seekByteOffset is { } off + ? new SeekResult { Success = true, SeekBeyondBuffer = true, ByteOffset = off } + : new SeekResult { Success = true }); + case "DeepDrftAudio.reinitializeFromOffset": + ReinitCallCount++; + return Result(true); + case "DeepDrftAudio.markStreamComplete": + MarkCompleteCallCount++; + return (ValueTask)(object)ValueTask.FromResult(new StreamingResult { Success = true }); + case "DeepDrftAudio.recoverFromFailedRefill": + RecoverCallCount++; + return Result(true); + default: + // createPlayer / setOnProgressCallback / setOnEndCallback / setVolume / + // ensureAudioContextReady / initializeStreaming / startStreamingPlayback / + // stop / unload / disposePlayer → generic success. + if (typeof(TValue) == typeof(AudioOperationResult)) + return Result(true); + return Ok(default!); + } + } + + public ValueTask InvokeAsync(string identifier, CancellationToken cancellationToken, object?[]? args) + => InvokeAsync(identifier, args); + + private static ValueTask Ok(object? value) => ValueTask.FromResult((TValue)value!); + private static ValueTask Result(bool success) => + ValueTask.FromResult((TValue)(object)new AudioOperationResult { Success = success }); + } + + private static TrackDto Track() => new() { EntryKey = "mix-1", TrackName = "Long Mix" }; + + private static StreamingAudioPlayerService BuildPlayer(HttpMessageHandler server, FakeJsRuntime js) + { + var interop = new AudioInteropService(js); + var media = new TrackMediaClient(new SingleClientFactory(server)); + return new StreamingAudioPlayerService(interop, media, NullLogger.Instance); + } + + [Test] + public async Task ForwardPlayback_FetchesBoundedSegments_AdvancingCursorToEof() + { + // 10 MB file → 3 segments (4 MB, 4 MB, 2 MB tail). No back-pressure: drains straight through. + var total = 10L * 1024 * 1024; + var server = new SegmentServer(total); + var js = new FakeJsRuntime(); + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + + Assert.Multiple(() => + { + Assert.That(server.AudioRanges, Has.Count.EqualTo(3), + "a 10 MB file at a 4 MB segment size is fetched as three bounded segments"); + + // Contiguous, bounded segments advancing the cursor to EOF. + Assert.That(server.AudioRanges[0], Is.EqualTo((0L, (long?)(SegmentSize - 1)))); + Assert.That(server.AudioRanges[1], Is.EqualTo((SegmentSize, (long?)(2 * SegmentSize - 1)))); + Assert.That(server.AudioRanges[2], Is.EqualTo((2 * SegmentSize, (long?)(3 * SegmentSize - 1)))); + + Assert.That(js.MarkCompleteCallCount, Is.EqualTo(1), "stream-complete fires once at true EOF, not per segment"); + }); + } + + [Test] + public async Task ForwardPlayback_DoesNotFetchNextSegment_WhileProductionPaused() + { + var total = 10L * 1024 * 1024; + var server = new SegmentServer(total); + + // Chunk results report paused=true (so the loop enters the inter-segment gate), and the poll + // stays paused for the first two checks, then releases — so the next segment is delayed, not + // skipped. The gate must hold the SECOND fetch until the poll clears. + var pollChecks = 0; + var js = new FakeJsRuntime( + chunkProductionPaused: () => true, + isProductionPaused: () => ++pollChecks < 3); // paused for polls 1,2; clear on poll 3 + + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + + Assert.Multiple(() => + { + Assert.That(js.IsProductionPausedCallCount, Is.GreaterThanOrEqualTo(3), + "the inter-segment gate must poll the fill signal while paused, not fetch immediately"); + Assert.That(server.AudioRanges, Has.Count.EqualTo(3), + "once the gate clears, segmentation resumes and still reaches EOF — paused delays, never skips"); + }); + } + + [Test] + public async Task SmallFile_FetchedInOneShortSegment_NoSecondFetch() + { + // 2 MB file < one segment: the first bounded request returns a short slice → EOF, no second fetch. + var total = 2L * 1024 * 1024; + var server = new SegmentServer(total); + var js = new FakeJsRuntime(); + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + + Assert.Multiple(() => + { + Assert.That(server.AudioRanges, Has.Count.EqualTo(1), "a sub-segment file needs exactly one fetch"); + Assert.That(server.AudioRanges[0], Is.EqualTo((0L, (long?)(SegmentSize - 1))), + "the request is still the bounded segment shape; the server returns the short available tail"); + Assert.That(js.MarkCompleteCallCount, Is.EqualTo(1)); + }); + } + + [Test] + public async Task ForwardLoad_NeverReinitializesDecoder() + { + var total = 20L * 1024 * 1024; + var server = new SegmentServer(total); + var js = new FakeJsRuntime(); + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + + Assert.Multiple(() => + { + Assert.That(server.AudioRanges, Has.Count.EqualTo(5), "20 MB / 4 MB = five contiguous forward segments"); + Assert.That(js.ReinitCallCount, Is.Zero, + "a forward load from byte 0 never reinitializes the decoder — reinit is the seek-only continuation step"); + }); + } + + [Test] + public async Task MidStream_TruncatedSegment_RoutesToRecovery_NotSilentEof() + { + // 10 MB file → 3 segments. The second segment's body is short (1 MB instead of 4 MB) while + // cursor < totalLength — simulates a connection drop mid-segment. The loop must NOT treat this + // as EOF (must not call MarkStreamComplete) and must route to recovery (scheduler halted) so + // the buffered tail cannot drain into a silent false end. + var total = 10L * 1024 * 1024; + var truncatedBodyBytes = 1L * 1024 * 1024; // 1 MB short body for segment 2 + var server = new TruncatingAfterFirstSegmentServer(total, truncatedBodyBytes); + var js = new FakeJsRuntime(); + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + + Assert.Multiple(() => + { + Assert.That(js.MarkCompleteCallCount, Is.Zero, + "a truncated non-final segment must NOT be reported as a clean EOF — MarkStreamComplete must not fire"); + Assert.That(js.RecoverCallCount, Is.EqualTo(1), + "a truncated segment while cursor < totalLength is a failure: scheduler must be halted via recovery"); + Assert.That(player.IsLoaded, Is.True, + "recovery leaves the track loaded so the listener can retry — not torn down to unloaded"); + Assert.That(player.IsStreamingMode, Is.True, + "recovery must restore IsStreamingMode=true so Seek() is not wedged (AC6 / Phase 21.3 retry contract)"); + Assert.That(player.IsPaused, Is.True, + "recovery settles into a paused state, not playing"); + Assert.That(player.ErrorMessage, Is.Not.Null.And.Not.Empty, + "recovery surfaces an error message to the UI"); + }); + } + + [Test] + public async Task MidStream_SegmentFetchFailure_RoutesToRecovery_NotSilentFalseEnd() + { + // 10 MB file → 3 segments. The second segment fetch fails (HTTP 500), simulating a network + // error after playback is already underway. The loop must halt the JS scheduler via recovery + // rather than letting the buffered first-segment tail drain into a silent false end (AC6). + var total = 10L * 1024 * 1024; + var server = new FailingAfterFirstSegmentServer(total); + var js = new FakeJsRuntime(); + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + + Assert.Multiple(() => + { + Assert.That(js.MarkCompleteCallCount, Is.Zero, + "a mid-stream fetch failure must not report clean EOF — MarkStreamComplete must not fire"); + Assert.That(js.RecoverCallCount, Is.EqualTo(1), + "a mid-stream fetch failure must halt the scheduler via recovery, not leave it to drain"); + Assert.That(player.IsLoaded, Is.True, + "recovery leaves the track loaded so the listener can retry — not torn down to unloaded"); + Assert.That(player.IsStreamingMode, Is.True, + "recovery must restore IsStreamingMode=true so Seek() is not wedged (AC6 / Phase 21.3 retry contract)"); + Assert.That(player.IsPaused, Is.True, + "recovery settles into a paused state, not playing"); + Assert.That(player.ErrorMessage, Is.Not.Null.And.Not.Empty, + "recovery surfaces an error message to the UI"); + }); + } + + [Test] + public async Task SeekBeyondBuffer_ReinitsOnceThenSegmentsForwardFromOffset() + { + // 20 MB file. After load, seek to a byte offset 12 MB in: the seek must reinit the decoder once, + // then continue the SAME bounded-segment loop from 12 MB to EOF (no forked fetch path — C1/C5). + var total = 20L * 1024 * 1024; + var seekOffset = 12L * 1024 * 1024; + var server = new SegmentServer(total); + var js = new FakeJsRuntime(seekByteOffset: seekOffset); + var player = BuildPlayer(server, js); + + await player.SelectTrackStreaming(Track()); + var afterLoad = server.AudioRanges.Count; + + await player.Seek(300); // arbitrary time; the scripted seek returns the 12 MB byte offset + + // Segments fetched by the seek/refill: everything after the initial-load segments. + var refillRanges = server.AudioRanges.Skip(afterLoad).ToList(); + + Assert.Multiple(() => + { + Assert.That(js.ReinitCallCount, Is.EqualTo(1), + "a seek-beyond-buffer reinitializes the decoder exactly once for the header-less continuation"); + + Assert.That(refillRanges, Has.Count.EqualTo(2), + "from 12 MB to 20 MB at a 4 MB segment is two bounded segments (4 MB + 4 MB tail)"); + Assert.That(refillRanges[0], Is.EqualTo((seekOffset, (long?)(seekOffset + SegmentSize - 1))), + "the first refill segment is bounded and starts at the resolved seek offset"); + Assert.That(refillRanges[1], Is.EqualTo((seekOffset + SegmentSize, (long?)(seekOffset + 2 * SegmentSize - 1))), + "segmentation continues forward from the seek offset — the same loop, the same bounded shape"); + }); + } +} diff --git a/DeepDrftTests/TrackMediaBoundedRangeTests.cs b/DeepDrftTests/TrackMediaBoundedRangeTests.cs new file mode 100644 index 0000000..3dfcd40 --- /dev/null +++ b/DeepDrftTests/TrackMediaBoundedRangeTests.cs @@ -0,0 +1,96 @@ +using System.Net; +using System.Net.Http.Headers; +using DeepDrftModels.Enums; +using DeepDrftPublic.Client.Clients; + +namespace DeepDrftTests; + +/// +/// Pins the Phase 21 Direction B bounded-Range request shape on . +/// The network-memory bound rests on each forward fetch being a finite bytes=start-end slice (so the +/// browser buffers only one segment), and on the caller learning the file total from the 206 +/// Content-Range header (the EOF boundary the segment cursor advances toward). Both are request/response +/// plumbing the harness can observe directly; the actual browser memory behaviour is Daniel's manual re-run. +/// +[TestFixture] +public class TrackMediaBoundedRangeTests +{ + // Captures the outgoing Range header and returns a 206 with a Content-Range so TotalLength resolves. + private sealed class RangeCapturingHandler : HttpMessageHandler + { + private readonly long _total; + public RangeHeaderValue? CapturedRange { get; private set; } + + public RangeCapturingHandler(long total) => _total = total; + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + CapturedRange = request.Headers.Range; + + var from = request.Headers.Range?.Ranges.First().From ?? 0; + var to = request.Headers.Range?.Ranges.First().To ?? (_total - 1); + var body = new byte[to - from + 1]; + + var response = new HttpResponseMessage(HttpStatusCode.PartialContent) + { + Content = new ByteArrayContent(body), + }; + response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total); + return Task.FromResult(response); + } + } + + private sealed class SingleClientFactory : IHttpClientFactory + { + private readonly HttpMessageHandler _handler; + public SingleClientFactory(HttpMessageHandler handler) => _handler = handler; + public HttpClient CreateClient(string name) => + new(_handler, disposeHandler: false) { BaseAddress = new Uri("https://content.test/") }; + } + + [Test] + public async Task GetTrackMedia_WithByteEnd_SendsBoundedRange() + { + var handler = new RangeCapturingHandler(total: 100_000_000); + var client = new TrackMediaClient(new SingleClientFactory(handler)); + + await client.GetTrackMedia("track-1", byteOffset: 0, byteEnd: 4 * 1024 * 1024 - 1, format: AudioFormat.Lossless); + + var range = handler.CapturedRange!.Ranges.First(); + Assert.Multiple(() => + { + Assert.That(range.From, Is.EqualTo(0), "bounded request starts at the cursor"); + Assert.That(range.To, Is.EqualTo(4 * 1024 * 1024 - 1), + "bounded request must carry an inclusive end so the server returns a finite slice (one segment)"); + }); + } + + [Test] + public async Task GetTrackMedia_WithoutByteEnd_SendsOpenEndedRange() + { + var handler = new RangeCapturingHandler(total: 100_000_000); + var client = new TrackMediaClient(new SingleClientFactory(handler)); + + await client.GetTrackMedia("track-1", byteOffset: 1024, byteEnd: null, format: AudioFormat.Lossless); + + var range = handler.CapturedRange!.Ranges.First(); + Assert.Multiple(() => + { + Assert.That(range.From, Is.EqualTo(1024), "open-ended request starts at the cursor"); + Assert.That(range.To, Is.Null, "no byteEnd → open-ended bytes=start- (pre-Direction-B shape, kept working)"); + }); + } + + [Test] + public async Task GetTrackMedia_206Response_SurfacesTotalLengthFromContentRange() + { + var handler = new RangeCapturingHandler(total: 970_000_000); + var client = new TrackMediaClient(new SingleClientFactory(handler)); + + var result = await client.GetTrackMedia("track-1", byteOffset: 0, byteEnd: 4 * 1024 * 1024 - 1); + + Assert.That(result.Success, Is.True); + Assert.That(result.Value!.TotalLength, Is.EqualTo(970_000_000), + "the file total (the segment cursor's EOF boundary) must come from the 206 Content-Range"); + } +}