Phase 21 Direction B: bound network memory via Range-segmented forward fetch
Replace the open-ended forward GET with sequential bounded bytes=start-end segments, the next fetched only when the scheduler drains below low-water, so the browser holds ~one segment regardless of file size. Seek converges on the same loop. Strip BP-DIAG.
This commit is contained in:
@@ -20,13 +20,24 @@ public class TrackMediaResponse : IDisposable
|
||||
/// </summary>
|
||||
public string ContentType { get; }
|
||||
|
||||
/// <summary>
|
||||
/// The total file length in bytes, parsed from the 206 response's <c>Content-Range:
|
||||
/// bytes start-end/TOTAL</c> header (Phase 21 Direction B). Null when the server returned
|
||||
/// 200 (no Content-Range) — callers fall back to <see cref="ContentLength"/> as the total.
|
||||
/// This is the EOF boundary the segment loop advances its cursor toward, and the full
|
||||
/// logical length the JS decoder must see (so a bounded segment's small Content-Length
|
||||
/// never trips the decoder's byte-count completion early).
|
||||
/// </summary>
|
||||
public long? TotalLength { get; }
|
||||
|
||||
private readonly HttpResponseMessage _response;
|
||||
|
||||
public TrackMediaResponse(Stream stream, long contentLength, string contentType, HttpResponseMessage response)
|
||||
public TrackMediaResponse(Stream stream, long contentLength, string contentType, long? totalLength, HttpResponseMessage response)
|
||||
{
|
||||
Stream = stream;
|
||||
ContentLength = contentLength;
|
||||
ContentType = contentType;
|
||||
TotalLength = totalLength;
|
||||
_response = response;
|
||||
}
|
||||
|
||||
@@ -54,6 +65,15 @@ public class TrackMediaClient
|
||||
/// token aborts the in-flight server connection rather than leaving the server
|
||||
/// draining bytes into a dead socket.
|
||||
/// <para>
|
||||
/// <paramref name="byteEnd"/> (Phase 21 Direction B) bounds the request to a single
|
||||
/// segment: when set, the Range header is <c>bytes={byteOffset}-{byteEnd}</c> (inclusive),
|
||||
/// so the browser holds at most ~one segment of raw bytes regardless of file size — the
|
||||
/// network-memory bound this phase exists for. When null the request is open-ended
|
||||
/// (<c>bytes={byteOffset}-</c>), the pre-Direction-B behaviour. Either way the response's
|
||||
/// <c>Content-Range</c> total is surfaced via <see cref="TrackMediaResponse.TotalLength"/>
|
||||
/// so the caller knows the EOF boundary and the full logical length the decoder must see.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <paramref name="format"/> selects the delivery rendering (Phase 18): the default
|
||||
/// <see cref="AudioFormat.Lossless"/> sends no <c>format</c> query param, so existing
|
||||
/// callers hit the byte-identical pre-Phase-18 endpoint; <see cref="AudioFormat.Opus"/>
|
||||
@@ -65,12 +85,13 @@ public class TrackMediaClient
|
||||
public async Task<ApiResult<TrackMediaResponse>> GetTrackMedia(
|
||||
string trackId,
|
||||
long byteOffset = 0,
|
||||
long? byteEnd = null,
|
||||
AudioFormat format = AudioFormat.Lossless,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Same URL for every seek — only the Range header differs. byteOffset 0 is
|
||||
// Same URL for every fetch — only the Range header differs. byteOffset 0 is
|
||||
// not special-cased: "bytes=0-" requests the whole file from the start.
|
||||
// Lossless omits the format param entirely so the request is byte-identical to
|
||||
// the pre-Phase-18 endpoint; only Opus appends ?format=opus.
|
||||
@@ -78,18 +99,19 @@ public class TrackMediaClient
|
||||
? $"api/track/{trackId}"
|
||||
: $"api/track/{trackId}?format={format.ToString().ToLowerInvariant()}";
|
||||
using var request = new HttpRequestMessage(HttpMethod.Get, uri);
|
||||
request.Headers.Range = new RangeHeaderValue(byteOffset, null);
|
||||
// Bounded (byteEnd set) → "bytes=start-end" so the server returns a finite 206
|
||||
// slice and the browser buffers only that segment; open-ended (byteEnd null) →
|
||||
// "bytes=start-". The server honours both via File(..., enableRangeProcessing: true),
|
||||
// which parses the full RFC 7233 range grammar and slices accordingly.
|
||||
request.Headers.Range = new RangeHeaderValue(byteOffset, byteEnd);
|
||||
|
||||
// Stream the response body incrementally instead of buffering it whole (Phase 21.4 fix).
|
||||
// In Blazor WebAssembly the HttpClient is backed by the browser fetch API; without this the
|
||||
// browser buffers the ENTIRE body before the response stream yields a byte, so the 21.2
|
||||
// read-loop pause (StreamingAudioPlayerService) backpressures nothing — the whole payload is
|
||||
// already in memory. Enabling streaming makes ReadAsync pull from a browser ReadableStream
|
||||
// whose backpressure reaches the underlying fetch, so pausing reads genuinely throttles the
|
||||
// network. This is a request-option flag, not a runtime call: on the SSR server-to-server path
|
||||
// the SocketsHttpHandler simply ignores the unknown option, so it is safe unguarded. Applies to
|
||||
// BOTH the initial stream (byteOffset 0) and the seek/refill Range requests (21.3) — both share
|
||||
// this method, so both depend on the same backpressure.
|
||||
// browser buffers the ENTIRE body before the response stream yields a byte. With Direction B
|
||||
// each request is already bounded to one segment, so the body is small regardless — but
|
||||
// streaming still lets us read it incrementally and is harmless on the SSR server-to-server
|
||||
// path (SocketsHttpHandler ignores the unknown option). Kept for both the initial and the
|
||||
// seek/refill paths since both share this method.
|
||||
request.SetBrowserResponseStreamingEnabled(true);
|
||||
|
||||
// Use HttpCompletionOption.ResponseHeadersRead to get stream immediately
|
||||
@@ -100,11 +122,15 @@ public class TrackMediaClient
|
||||
// Default to WAV when the server omits the header — the only format shipping
|
||||
// today — so the JS factory always receives a usable media type.
|
||||
var contentType = response.Content.Headers.ContentType?.MediaType ?? "audio/wav";
|
||||
// Content-Range "bytes start-end/TOTAL" carries the full file length on a 206; on a 200
|
||||
// there is no Content-Range, so TotalLength is null and callers use ContentLength.
|
||||
var totalLength = response.Content.Headers.ContentRange?.Length;
|
||||
var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
|
||||
|
||||
// TrackMediaResponse takes ownership of both stream and response;
|
||||
// do NOT dispose response here — the caller disposes via TrackMediaResponse.Dispose().
|
||||
return ApiResult<TrackMediaResponse>.CreatePassResult(new TrackMediaResponse(stream, contentLength, contentType, response));
|
||||
return ApiResult<TrackMediaResponse>.CreatePassResult(
|
||||
new TrackMediaResponse(stream, contentLength, contentType, totalLength, response));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
|
||||
@@ -18,12 +18,23 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
|
||||
|
||||
// Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water
|
||||
// mark, the read loop stops calling ReadAsync and polls IsProductionPaused at this cadence
|
||||
// until the fill drains below low-water. 100 ms is well under the low-water lookahead (seconds),
|
||||
// so resume is prompt relative to the playhead — no starvation (AC3) — while keeping the poll
|
||||
// cheap. The poll honors the loop's cancellation token, so a track switch/seek during a pause
|
||||
// exits through the same drain discipline as a pause during ReadAsync (C6).
|
||||
// mark, the segment loop stops fetching the next segment and polls IsProductionPaused at this
|
||||
// cadence until the fill drains below low-water. 100 ms is well under the low-water lookahead
|
||||
// (seconds), so resume is prompt relative to the playhead — no starvation (AC3) — while keeping
|
||||
// the poll cheap. The poll honors the loop's cancellation token, so a track switch/seek during a
|
||||
// pause exits through the same drain discipline as a pause during ReadAsync (C6).
|
||||
private const int BackpressurePollMs = 100;
|
||||
|
||||
// Phase 21 Direction B — forward Range-segment size. The forward stream is fetched as a
|
||||
// sequence of bounded "bytes=cursor-(cursor+SegmentSizeBytes-1)" 206 requests, the next issued
|
||||
// only when the scheduler drains below low-water. Because each request is bounded and fully
|
||||
// consumed before the next is issued, the browser fetch holds AT MOST ~one segment of raw bytes
|
||||
// regardless of file size — this is the network-memory bound the phase exists for (the open-ended
|
||||
// single GET buffered the whole ~970 MB body in the browser even when reads were paused, the
|
||||
// 21.4 finding). 4 MB balances request overhead (a 1 GB mix is ~250 segments) against memory:
|
||||
// at the 30 s high-water mark a fast connection holds well under a segment of unplayed raw bytes,
|
||||
// so the bound is the segment size, not the decoded window. Tunable; not magic.
|
||||
private const long SegmentSizeBytes = 4 * 1024 * 1024;
|
||||
private int _currentBufferSize = DefaultBufferSize;
|
||||
private int _consecutiveSlowReads = 0;
|
||||
|
||||
@@ -35,14 +46,6 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
public int BufferedChunks { get; private set; } = 0;
|
||||
public bool IsSeekingBeyondBuffer { get; private set; } = false;
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────────────────────────────
|
||||
// [BP-DIAG] Phase 21.4 back-pressure diagnostic. TEMPORARY — strip once the cause is confirmed
|
||||
// in Daniel's browser run. Logs every Nth chunk's ProductionPaused flag plus pause-poll
|
||||
// enter/exit so a grep for "[BP-DIAG]" in the WASM console tells whether the read loop ever sees
|
||||
// the pause signal and whether the poll actually holds. Throttled by chunk count to avoid flooding.
|
||||
private const int BpDiagChunkLogEvery = 16;
|
||||
// ───────────────────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
private bool _streamingPlaybackStarted = false;
|
||||
private CancellationTokenSource? _streamingCancellation;
|
||||
private Task? _activeStreamingTask;
|
||||
@@ -203,23 +206,29 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
// seek-beyond-buffer re-fetch reuses the same artifact.
|
||||
_currentFormat = await ResolveStreamFormatAsync(track.EntryKey, loadCts.Token);
|
||||
|
||||
// Pass the streaming token to the HTTP layer so a navigation/track switch
|
||||
// aborts the server connection instead of leaving it draining bytes.
|
||||
var mediaResult = await _trackMediaClient.GetTrackMedia(
|
||||
// Direction B: fetch the FIRST bounded segment to learn the total file length and the
|
||||
// content type. The 206 Content-Range carries the total; the segment loop advances its
|
||||
// cursor toward it. The decoder is initialized with the TOTAL length (not the segment
|
||||
// length) so a bounded segment's small Content-Length never trips its byte-count
|
||||
// completion early — segment boundaries are invisible to the decoder, which sees one
|
||||
// continuous in-order byte stream. Passing the streaming token aborts the server
|
||||
// connection on a navigation/track switch instead of leaving it draining bytes.
|
||||
var firstSegment = await _trackMediaClient.GetTrackMedia(
|
||||
track.EntryKey,
|
||||
byteOffset: 0,
|
||||
byteEnd: SegmentSizeBytes - 1,
|
||||
format: _currentFormat,
|
||||
cancellationToken: loadCts.Token);
|
||||
if (!mediaResult.Success)
|
||||
if (!firstSegment.Success)
|
||||
{
|
||||
var technicalError = mediaResult.GetMessage();
|
||||
var technicalError = firstSegment.GetMessage();
|
||||
_logger.LogError("Failed to get track media for {TrackId}: {Error}",
|
||||
track.EntryKey, technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mediaResult.Value == null)
|
||||
if (firstSegment.Value == null)
|
||||
{
|
||||
const string technicalError = "No audio returned from server";
|
||||
_logger.LogError("No audio data returned for track {TrackId}", track.EntryKey);
|
||||
@@ -227,13 +236,22 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
return;
|
||||
}
|
||||
|
||||
using var audio = mediaResult.Value;
|
||||
// Ownership of the first segment transfers to the segment loop, which disposes it (and
|
||||
// every subsequent segment). No `using` here — a double dispose is avoided and the socket
|
||||
// is released the moment the loop finishes consuming the segment.
|
||||
var audio = firstSegment.Value;
|
||||
|
||||
// Initialize streaming mode with content length and media type (drives
|
||||
// JS format-decoder selection).
|
||||
var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, audio.ContentLength, audio.ContentType);
|
||||
// The total logical length the decoder must see. On a 206 the Content-Range carries it;
|
||||
// a 200 (server ignored Range / file ≤ one segment) has no Content-Range, so fall back to
|
||||
// the body's own Content-Length — that body IS the whole file in that case.
|
||||
var totalLength = audio.TotalLength ?? audio.ContentLength;
|
||||
|
||||
// Initialize streaming mode with the TOTAL length and media type (drives JS
|
||||
// format-decoder selection). See above: total, not segment, length.
|
||||
var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, totalLength, audio.ContentType);
|
||||
if (!streamingResult.Success)
|
||||
{
|
||||
audio.Dispose();
|
||||
var technicalError = $"Failed to initialize streaming: {streamingResult.Error}";
|
||||
_logger.LogError("Streaming initialization failed for track {TrackId}: {Error}",
|
||||
track.EntryKey, technicalError);
|
||||
@@ -241,7 +259,10 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
return;
|
||||
}
|
||||
|
||||
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, loadCts.Token);
|
||||
// Forward segmentation from byte 0. The first segment is already in hand; the loop pumps
|
||||
// it, then fetches subsequent bounded segments gated on the scheduler fill signal.
|
||||
_activeStreamingTask = RunSegmentedStreamAsync(
|
||||
track.EntryKey, audio, cursor: 0, totalLength, seekPosition: null, loadCts.Token);
|
||||
await _activeStreamingTask;
|
||||
}
|
||||
catch (OperationCanceledException) when (loadCts.IsCancellationRequested)
|
||||
@@ -385,176 +406,208 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
return profile;
|
||||
}
|
||||
|
||||
private async Task StreamAudioWithEarlyPlayback(TrackMediaResponse audio, CancellationToken cancellationToken)
|
||||
/// <summary>
|
||||
/// Phase 21 Direction B — the single segmented forward read loop, shared by the initial load and
|
||||
/// the seek/refill path (the convergence C1/C5 require: one cursor, one fetch mechanism, no forked
|
||||
/// path). It pumps the FIRST segment (already fetched by the caller), then fetches subsequent
|
||||
/// bounded <c>bytes=cursor-(cursor+SegmentSizeBytes-1)</c> 206 segments — each only AFTER the
|
||||
/// scheduler drains below low-water — until the cursor reaches <paramref name="totalLength"/>.
|
||||
/// Because each segment is bounded and fully consumed before the next is requested, the browser
|
||||
/// holds at most ~one segment of raw bytes (the network-memory bound), while the decoder sees one
|
||||
/// continuous in-order byte stream across segment boundaries (the demuxer/decoder buffer partial
|
||||
/// frames/pages across the boundary exactly as for arbitrary chunks today — no per-segment reinit).
|
||||
/// </summary>
|
||||
/// <param name="firstSegment">The already-fetched first segment (byte <paramref name="cursor"/>).
|
||||
/// Owned by this method, which disposes it; subsequent segments are fetched and disposed inline.</param>
|
||||
/// <param name="cursor">File-absolute byte offset the first segment starts at (0 for a fresh load,
|
||||
/// the resolved seek offset for a refill).</param>
|
||||
/// <param name="totalLength">Total file length in bytes — the EOF boundary the cursor advances
|
||||
/// toward. The decoder is initialized/reinitialized against this, not the per-segment length.</param>
|
||||
/// <param name="seekPosition">Non-null for a seek/refill: the decoder is reinitialized for the
|
||||
/// header-less Range continuation at this time before the first segment's bytes are fed (WAV
|
||||
/// retains its header, Opus re-applies the cached setup + lead-trim). Null for a forward load from
|
||||
/// byte 0, where the first segment carries the header and no reinit is needed.</param>
|
||||
private async Task RunSegmentedStreamAsync(
|
||||
string trackId,
|
||||
TrackMediaResponse firstSegment,
|
||||
long cursor,
|
||||
long totalLength,
|
||||
double? seekPosition,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
byte[]? buffer = null;
|
||||
var segment = firstSegment;
|
||||
try
|
||||
{
|
||||
long totalBytesRead = 0;
|
||||
buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize); // Rent larger buffer to accommodate adaptive sizing
|
||||
int currentBytes;
|
||||
var readTimer = System.Diagnostics.Stopwatch.StartNew();
|
||||
var bpDiagChunkIndex = 0; // [BP-DIAG] per-stream chunk counter for throttled logging
|
||||
|
||||
do
|
||||
// Seek/refill: reinitialize the active decoder for the header-less continuation ONCE,
|
||||
// before any continuation bytes are fed. Forward-from-zero (seekPosition null) skips this
|
||||
// — its first segment carries the real header the decoder parses. Done here, inside the
|
||||
// single loop, so seek and forward share the same fetch+pump mechanism (no forked path).
|
||||
if (seekPosition is { } resumeAt)
|
||||
{
|
||||
readTimer.Restart();
|
||||
currentBytes = await audio.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
|
||||
readTimer.Stop();
|
||||
|
||||
// Adapt buffer size based on read performance
|
||||
AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds);
|
||||
|
||||
if (currentBytes > 0)
|
||||
// The decoder byte-counts the header-less continuation against the bytes REMAINING
|
||||
// from the range start to EOF (total − cursor), not the absolute total — that is what
|
||||
// reinitializeForRangeContinuation expects (StreamDecoder.remainingByteLength). The
|
||||
// loop's own cursor still targets the absolute totalLength for EOF.
|
||||
var remainingBytes = Math.Max(0, totalLength - cursor);
|
||||
var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, remainingBytes, resumeAt);
|
||||
if (!reinitResult.Success)
|
||||
{
|
||||
totalBytesRead += currentBytes;
|
||||
|
||||
// Always slice to the exact number of bytes read. The pooled buffer
|
||||
// is rented at MaxBufferSize and may carry stale bytes past
|
||||
// currentBytes from a prior rental — handing the full array to JS
|
||||
// interop would serialise that garbage into the audio stream.
|
||||
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
|
||||
|
||||
// Process chunk for streaming
|
||||
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
|
||||
if (!chunkResult.Success)
|
||||
{
|
||||
var error = $"Failed to process streaming chunk: {chunkResult.Error}";
|
||||
_logger.LogWarning("Chunk processing failed: {Error}", error);
|
||||
throw new Exception(error);
|
||||
}
|
||||
|
||||
// Update streaming state
|
||||
CanStartStreaming = chunkResult.CanStartStreaming;
|
||||
HeaderParsed = chunkResult.HeaderParsed;
|
||||
BufferedChunks = chunkResult.BufferCount;
|
||||
|
||||
// [BP-DIAG] Phase 21.4 — throttled per-chunk view of the back-pressure signal as
|
||||
// the C# loop sees it. If ProductionPaused never logs True while bytes keep
|
||||
// flowing, the break is upstream (JS latch / lookahead math); if it logs True but
|
||||
// the transfer still races to 100%, the break is the transport (browser buffered
|
||||
// the whole body, SetBrowserResponseStreamingEnabled not in effect). TEMPORARY.
|
||||
if (bpDiagChunkIndex % BpDiagChunkLogEvery == 0)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"[BP-DIAG] chunk #{Chunk} bytesRead={Bytes} totalRead={Total} bufferCount={BufCount} canStart={CanStart} productionPaused={Paused} isPaused={IsPaused}",
|
||||
bpDiagChunkIndex, currentBytes, totalBytesRead, chunkResult.BufferCount,
|
||||
chunkResult.CanStartStreaming, chunkResult.ProductionPaused, IsPaused);
|
||||
}
|
||||
bpDiagChunkIndex++;
|
||||
|
||||
// Set duration from WAV header when available (only set once)
|
||||
if (chunkResult.Duration.HasValue && Duration == null)
|
||||
{
|
||||
Duration = chunkResult.Duration.Value;
|
||||
_logger.LogInformation("Duration set from WAV header: {Duration:F2} seconds", Duration);
|
||||
// Feed the same once-only duration to the play session so it can compute the
|
||||
// completion fraction at close. Safe before/after session open — SetDuration
|
||||
// is a no-op when no session is open and idempotent otherwise.
|
||||
_playTracker?.SetDuration(chunkResult.Duration.Value);
|
||||
}
|
||||
|
||||
// Start playback as soon as we can
|
||||
if (!_streamingPlaybackStarted && CanStartStreaming)
|
||||
{
|
||||
var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId);
|
||||
if (playbackResult.Success)
|
||||
{
|
||||
_streamingPlaybackStarted = true;
|
||||
IsPlaying = true;
|
||||
IsPaused = false;
|
||||
IsLoaded = true; // Track is loaded and ready to play (even if still downloading)
|
||||
ErrorMessage = null;
|
||||
|
||||
// Open the play session exactly once per load, at the moment playback truly
|
||||
// begins (§2.1). The _sessionOpened guard keeps the SeekBeyondBuffer re-stream
|
||||
// — which re-enters this transition with _streamingPlaybackStarted reset —
|
||||
// from opening a second session for the same play. Duration may already be
|
||||
// known from a prior chunk, so re-feed it after opening.
|
||||
if (!_sessionOpened && _currentTrackId is { } trackKey)
|
||||
{
|
||||
_sessionOpened = true;
|
||||
_playTracker?.OnPlaybackStarted(trackKey);
|
||||
if (Duration is { } d)
|
||||
_playTracker?.SetDuration(d);
|
||||
}
|
||||
|
||||
await NotifyStateChanged(); // Immediate notification for critical state change
|
||||
}
|
||||
else
|
||||
{
|
||||
var technicalError = $"Failed to start streaming playback: {playbackResult.Error}";
|
||||
_logger.LogError("Failed to start playback: {Error}", technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
}
|
||||
}
|
||||
|
||||
// Update progress
|
||||
if (audio.ContentLength > 0)
|
||||
{
|
||||
LoadProgress = Math.Min(1.0, (double)totalBytesRead / audio.ContentLength);
|
||||
}
|
||||
|
||||
await ThrottledNotifyStateChanged();
|
||||
|
||||
// Phase 21.2a back-pressure (serves BOTH paths). The chunk we just processed
|
||||
// reported the scheduler's forward fill is over the high-water mark — stop
|
||||
// reading the socket so the unplayed decoded region stays bounded. Pausing
|
||||
// ReadAsync lets the kernel TCP window close (we are working WITH transport flow
|
||||
// control, not against it). Poll until the fill drains below low-water, then
|
||||
// resume the loop. For WAV this is the whole story (StreamDecoder decodes
|
||||
// synchronously into the scheduler); the Opus feed additionally self-throttles
|
||||
// its demux/decode off the SAME signal (21.2b), so its upstream queues stay
|
||||
// near-empty too. The poll awaits on cancellationToken, so a track switch/seek
|
||||
// mid-pause throws OCE and unwinds through the existing drain discipline (C6) —
|
||||
// no separate cancellation path, no stale read racing a reinit.
|
||||
if (chunkResult.ProductionPaused)
|
||||
{
|
||||
// [BP-DIAG] Phase 21.4 — the read loop is ENTERING the pause-poll: reads have
|
||||
// stopped, the socket should now stall and the transfer should plateau. If you
|
||||
// see this line but the network transfer still completes, the transport is
|
||||
// buffered (streaming flag not in effect). TEMPORARY.
|
||||
_logger.LogInformation(
|
||||
"[BP-DIAG] ENTER pause-poll at chunk #{Chunk} totalRead={Total} isPaused={IsPaused}",
|
||||
bpDiagChunkIndex, totalBytesRead, IsPaused);
|
||||
var bpDiagPollCount = 0;
|
||||
|
||||
// UC5: while the user is paused, the playhead is frozen so forward lookahead
|
||||
// never shrinks and the poll would spin indefinitely. Wait here until playback
|
||||
// resumes (IsPaused clears) OR the fill drains on its own. Cancellation is
|
||||
// unchanged: a track switch/seek/stop while paused still throws OCE and unwinds
|
||||
// through the existing drain discipline (C6) — no weakening of the cancel path.
|
||||
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
// [BP-DIAG] Phase 21.4 — heartbeat every ~1 s (10 × 100 ms) so a held poll
|
||||
// is visible without flooding; shows the loop is genuinely parked. TEMPORARY.
|
||||
if (bpDiagPollCount % 10 == 0)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"[BP-DIAG] HOLD pause-poll iter={Iter} isPaused={IsPaused}",
|
||||
bpDiagPollCount, IsPaused);
|
||||
}
|
||||
bpDiagPollCount++;
|
||||
await Task.Delay(BackpressurePollMs, cancellationToken);
|
||||
}
|
||||
|
||||
// [BP-DIAG] Phase 21.4 — the read loop is EXITING the pause-poll and resuming
|
||||
// ReadAsync: the fill drained below low-water. TEMPORARY.
|
||||
_logger.LogInformation(
|
||||
"[BP-DIAG] EXIT pause-poll at chunk #{Chunk} after {Iters} polls",
|
||||
bpDiagChunkIndex, bpDiagPollCount);
|
||||
}
|
||||
throw new Exception($"Failed to reinitialize for offset streaming: {reinitResult.Error}");
|
||||
}
|
||||
} while (currentBytes > 0);
|
||||
}
|
||||
|
||||
// Notify the JS decoder that the stream is finished. When the server omits
|
||||
// Content-Length the StreamDecoder cannot determine completion via byte counting
|
||||
// alone; this explicit signal ensures the tail-decoding path (streamComplete=true)
|
||||
// fires regardless of whether Content-Length was present.
|
||||
buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize); // larger rental to fit adaptive sizing
|
||||
var readTimer = System.Diagnostics.Stopwatch.StartNew();
|
||||
|
||||
// Segment loop. Each iteration fully consumes one bounded 206 body, advancing the cursor by
|
||||
// the bytes received. The next segment is fetched only when the scheduler is below
|
||||
// high-water (the inter-segment gate). EOF is the cursor reaching totalLength, or a short
|
||||
// segment (server returned fewer bytes than requested — the final slice).
|
||||
while (true)
|
||||
{
|
||||
long segmentBytesRead = 0;
|
||||
int currentBytes;
|
||||
do
|
||||
{
|
||||
readTimer.Restart();
|
||||
currentBytes = await segment.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
|
||||
readTimer.Stop();
|
||||
|
||||
AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds);
|
||||
|
||||
if (currentBytes > 0)
|
||||
{
|
||||
segmentBytesRead += currentBytes;
|
||||
|
||||
// Slice to the exact bytes read: the pooled buffer is rented at MaxBufferSize
|
||||
// and may carry stale bytes past currentBytes from a prior rental — handing the
|
||||
// full array to JS would serialise that garbage into the audio stream.
|
||||
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
|
||||
|
||||
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
|
||||
if (!chunkResult.Success)
|
||||
{
|
||||
var error = $"Failed to process streaming chunk: {chunkResult.Error}";
|
||||
_logger.LogWarning("Chunk processing failed: {Error}", error);
|
||||
throw new Exception(error);
|
||||
}
|
||||
|
||||
CanStartStreaming = chunkResult.CanStartStreaming;
|
||||
HeaderParsed = chunkResult.HeaderParsed;
|
||||
BufferedChunks = chunkResult.BufferCount;
|
||||
|
||||
// Set duration from header when available (only set once)
|
||||
if (chunkResult.Duration.HasValue && Duration == null)
|
||||
{
|
||||
Duration = chunkResult.Duration.Value;
|
||||
_logger.LogInformation("Duration set from header: {Duration:F2} seconds", Duration);
|
||||
// Feed the once-only duration to the play session for the completion
|
||||
// fraction. No-op when no session is open; idempotent otherwise.
|
||||
_playTracker?.SetDuration(chunkResult.Duration.Value);
|
||||
}
|
||||
|
||||
// Start playback as soon as we can — at the min-buffer threshold, exactly as
|
||||
// before (C2: first audio is not gated on the segment boundary; the first
|
||||
// segment alone clears the threshold).
|
||||
if (!_streamingPlaybackStarted && CanStartStreaming)
|
||||
{
|
||||
var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId);
|
||||
if (playbackResult.Success)
|
||||
{
|
||||
_streamingPlaybackStarted = true;
|
||||
IsPlaying = true;
|
||||
IsPaused = false;
|
||||
IsLoaded = true; // loaded and ready, even while still downloading
|
||||
ErrorMessage = null;
|
||||
|
||||
// Open the play session exactly once per load, at the moment playback
|
||||
// truly begins (§2.1). The _sessionOpened guard keeps a seek/refill
|
||||
// re-stream — which re-enters this transition with
|
||||
// _streamingPlaybackStarted reset — from opening a second session for
|
||||
// the same play. Duration may already be known, so re-feed it.
|
||||
if (!_sessionOpened && _currentTrackId is { } trackKey)
|
||||
{
|
||||
_sessionOpened = true;
|
||||
_playTracker?.OnPlaybackStarted(trackKey);
|
||||
if (Duration is { } d)
|
||||
_playTracker?.SetDuration(d);
|
||||
}
|
||||
|
||||
await NotifyStateChanged(); // immediate — critical state change
|
||||
}
|
||||
else
|
||||
{
|
||||
var technicalError = $"Failed to start streaming playback: {playbackResult.Error}";
|
||||
_logger.LogError("Failed to start playback: {Error}", technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
}
|
||||
}
|
||||
|
||||
// Progress against the total file length (cursor + bytes consumed so far).
|
||||
if (totalLength > 0)
|
||||
{
|
||||
LoadProgress = Math.Min(1.0, (double)(cursor + segmentBytesRead) / totalLength);
|
||||
}
|
||||
|
||||
await ThrottledNotifyStateChanged();
|
||||
}
|
||||
} while (currentBytes > 0);
|
||||
|
||||
// Segment fully consumed; advance the cursor and release this segment's stream/socket
|
||||
// before deciding whether to fetch the next. Disposing here keeps exactly one segment's
|
||||
// raw bytes resident at a time.
|
||||
cursor += segmentBytesRead;
|
||||
segment.Dispose();
|
||||
segment = null!;
|
||||
|
||||
// EOF: cursor reached the total, OR the server returned a short final slice (fewer
|
||||
// bytes than the segment we requested). Either way there is nothing left to fetch.
|
||||
var reachedTotal = totalLength > 0 && cursor >= totalLength;
|
||||
var shortSegment = segmentBytesRead < SegmentSizeBytes;
|
||||
if (reachedTotal || shortSegment)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH
|
||||
// instead of pacing ReadAsync on an open stream). Do not fetch the next segment while
|
||||
// the scheduler is over high-water; wait until it drains below low-water. Because the
|
||||
// browser only buffers bounded segments and we hold off requesting the next one, raw
|
||||
// network memory stays at ~one segment. The poll awaits on cancellationToken, so a
|
||||
// track switch/seek mid-wait throws OCE and unwinds through the existing drain
|
||||
// discipline (C6). UC5: a user pause freezes the playhead so the fill never drains —
|
||||
// hold here until playback resumes (IsPaused clears) OR the fill drains on its own.
|
||||
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
await Task.Delay(BackpressurePollMs, cancellationToken);
|
||||
}
|
||||
|
||||
// Fetch the next bounded segment. The end offset is clamped implicitly by the server
|
||||
// (a request past EOF yields the available tail as a short slice, caught above).
|
||||
var nextEnd = cursor + SegmentSizeBytes - 1;
|
||||
var nextResult = await _trackMediaClient.GetTrackMedia(
|
||||
trackId,
|
||||
byteOffset: cursor,
|
||||
byteEnd: nextEnd,
|
||||
format: _currentFormat,
|
||||
cancellationToken: cancellationToken);
|
||||
if (!nextResult.Success || nextResult.Value == null)
|
||||
{
|
||||
var technicalError = nextResult.GetMessage() ?? "Failed to fetch next stream segment";
|
||||
_logger.LogError("Failed to fetch segment at offset {Offset} for {TrackId}: {Error}",
|
||||
cursor, trackId, technicalError);
|
||||
throw new Exception(technicalError);
|
||||
}
|
||||
segment = nextResult.Value;
|
||||
}
|
||||
|
||||
// Notify the JS decoder that the stream is finished. The decoder marks completion by byte
|
||||
// count against the total it was initialized with; this explicit signal flushes the
|
||||
// residual tail and covers the (rare) case where the total was unknown.
|
||||
await _audioInterop.MarkStreamCompleteAsync(PlayerId);
|
||||
|
||||
// Mark as fully loaded
|
||||
LoadProgress = 1.0;
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
@@ -565,7 +618,7 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
StreamingErrorHandler.LogError(_logger, ex, "StreamAudioWithEarlyPlayback");
|
||||
StreamingErrorHandler.LogError(_logger, ex, "RunSegmentedStreamAsync");
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
|
||||
LoadProgress = 0;
|
||||
IsLoaded = false;
|
||||
@@ -575,6 +628,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Release the last segment (if a fetch failed mid-loop it may still be held) and the buffer.
|
||||
segment?.Dispose();
|
||||
if (buffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
@@ -653,6 +708,10 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
return;
|
||||
}
|
||||
|
||||
// Capture into a non-null local: _currentTrackId is the field a track-switch could clear, but
|
||||
// this seek operates against the track loaded NOW; the segment loop needs a stable id.
|
||||
var trackId = _currentTrackId;
|
||||
|
||||
IsSeekingBeyondBuffer = true;
|
||||
|
||||
// Cancel the current streaming loop AND wait for it to fully exit before
|
||||
@@ -691,19 +750,22 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
CurrentTime = seekPosition;
|
||||
await NotifyStateChanged();
|
||||
|
||||
// Request new stream from offset. Reuse the format the initial load resolved to (_currentFormat):
|
||||
// an Opus seek must come back as Opus bytes so the cached setup header + page-aligned byteOffset
|
||||
// (resolved by the JS decoder's index-based calculateByteOffset) match the continuation. The
|
||||
// offset itself is computed JS-side from the Opus seek index for Opus, exactly as it is from the
|
||||
// WAV header for lossless — one seam, format-appropriate math (AC9 / §3.4a C).
|
||||
var mediaResult = await _trackMediaClient.GetTrackMedia(
|
||||
_currentTrackId,
|
||||
// Request the FIRST bounded segment from the resolved offset (Direction B — converged with
|
||||
// the forward path). Reuse the format the initial load resolved to (_currentFormat): an
|
||||
// Opus seek must come back as Opus bytes so the cached setup header + page-aligned
|
||||
// byteOffset (resolved JS-side from the Opus seek index) match the continuation; WAV resolves
|
||||
// its offset from the header — one seam, format-appropriate math (AC9 / §3.4a C). The
|
||||
// segment loop then continues forward segmentation from this offset exactly as a fresh load
|
||||
// does from 0 — no forked fetch path (C1/C5).
|
||||
var firstSegment = await _trackMediaClient.GetTrackMedia(
|
||||
trackId,
|
||||
byteOffset,
|
||||
byteEnd: byteOffset + SegmentSizeBytes - 1,
|
||||
format: _currentFormat,
|
||||
cancellationToken: seekCts.Token);
|
||||
if (!mediaResult.Success || mediaResult.Value == null)
|
||||
if (!firstSegment.Success || firstSegment.Value == null)
|
||||
{
|
||||
var technicalError = mediaResult.GetMessage() ?? "Failed to load audio from position";
|
||||
var technicalError = firstSegment.GetMessage() ?? "Failed to load audio from position";
|
||||
_logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError);
|
||||
// Guard: a superseded seek must NOT touch shared state. The newer seek owns teardown.
|
||||
if (IsStillActiveSeek())
|
||||
@@ -717,33 +779,24 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
||||
return;
|
||||
}
|
||||
|
||||
using var audio = mediaResult.Value;
|
||||
var audio = firstSegment.Value;
|
||||
// The absolute EOF boundary the segment loop's cursor targets. On a 206 the Content-Range
|
||||
// carries the file total; on a 200 (single-segment file) fall back to cursor + body length.
|
||||
var totalLength = audio.TotalLength ?? (byteOffset + audio.ContentLength);
|
||||
|
||||
// Reinitialize JS player for offset streaming
|
||||
var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, audio.ContentLength, seekPosition);
|
||||
if (!reinitResult.Success)
|
||||
{
|
||||
_logger.LogError("Failed to reinitialize for offset streaming: {Error}", reinitResult.Error);
|
||||
// Guard: same single-writer discipline — only recover when we are still the active seek.
|
||||
if (IsStillActiveSeek())
|
||||
{
|
||||
await RecoverFromFailedRefill(seekPosition, "Failed to seek to position");
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogDebug("Reinit failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset streaming state for new stream
|
||||
// Reset streaming state for the new stream. The decoder reinit for the header-less
|
||||
// continuation happens INSIDE RunSegmentedStreamAsync (seekPosition non-null), so seek and
|
||||
// forward share one fetch+pump+reinit mechanism. A reinit failure there throws and lands in
|
||||
// the catch below, which recovers when still the active seek — the same clean-failure path
|
||||
// (AC6) the old explicit reinit branch had, now unified with the fetch-failure path.
|
||||
_streamingPlaybackStarted = false;
|
||||
CanStartStreaming = false;
|
||||
HeaderParsed = false;
|
||||
BufferedChunks = 0;
|
||||
|
||||
// Stream audio from offset
|
||||
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, seekCts.Token);
|
||||
// Stream from offset via the shared segment loop. Ownership of `audio` transfers to it.
|
||||
_activeStreamingTask = RunSegmentedStreamAsync(
|
||||
trackId, audio, cursor: byteOffset, totalLength, seekPosition, seekCts.Token);
|
||||
await _activeStreamingTask;
|
||||
|
||||
IsSeekingBeyondBuffer = false;
|
||||
|
||||
Reference in New Issue
Block a user