Files
deepdrft/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs
T
daniel-c-harvey be9de8d77c Collapse duplicate same-track streaming loads to enforce one load per play
A second LoadTrackStreaming for the same in-flight track (UI double-fire, queue re-entry, or JS false-end auto-advance) is now dropped; a different-track load still supersedes. Targets the Opus double-load; keeps load-gen diagnostics.
2026-06-24 23:08:58 -04:00

1162 lines
61 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using DeepDrftModels.DTOs;
using DeepDrftModels.Enums;
using DeepDrftPublic.Client.Clients;
using System.Buffers;
using Microsoft.Extensions.Logging;
using Microsoft.JSInterop;
namespace DeepDrftPublic.Client.Services;
public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerService
{
// Configuration constants
private const int DefaultBufferSize = 32 * 1024; // 32KB chunks
private const int NotificationThrottleMs = 100; // Throttle UI updates to max 10 per second
// Adaptive chunk sizing
private const int MinBufferSize = 16 * 1024; // 16KB minimum
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
// Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water
// mark, the segment loop stops fetching the next segment and polls IsProductionPaused at this
// cadence until the fill drains below low-water. 100 ms is well under the low-water lookahead
// (seconds), so resume is prompt relative to the playhead — no starvation (AC3) — while keeping
// the poll cheap. The poll honors the loop's cancellation token, so a track switch/seek during a
// pause exits through the same drain discipline as a pause during ReadAsync (C6).
private const int BackpressurePollMs = 100;
// Phase 21 Direction B — forward Range-segment size. The forward stream is fetched as a
// sequence of bounded "bytes=cursor-(cursor+SegmentSizeBytes-1)" 206 requests, the next issued
// only when the scheduler drains below low-water. Because each request is bounded and fully
// consumed before the next is issued, the browser fetch holds AT MOST ~one segment of raw bytes
// regardless of file size — this is the network-memory bound the phase exists for (the open-ended
// single GET buffered the whole ~970 MB body in the browser even when reads were paused, the
// 21.4 finding). 4 MB balances request overhead (a 1 GB mix is ~250 segments) against memory:
// at the 30 s high-water mark a fast connection holds well under a segment of unplayed raw bytes,
// so the bound is the segment size, not the decoded window. Tunable; not magic.
private const long SegmentSizeBytes = 4 * 1024 * 1024;
private int _currentBufferSize = DefaultBufferSize;
private int _consecutiveSlowReads = 0;
// Streaming state properties
public bool IsStreamingMode { get; private set; } = false;
public bool CanStartStreaming { get; private set; } = false;
public bool HeaderParsed { get; private set; } = false;
public int BufferedChunks { get; private set; } = 0;
public bool IsSeekingBeyondBuffer { get; private set; } = false;
private bool _streamingPlaybackStarted = false;
private CancellationTokenSource? _streamingCancellation;
private Task? _activeStreamingTask;
private DateTime _lastNotification = DateTime.MinValue;
private readonly ILogger<StreamingAudioPlayerService> _logger;
private string? _currentTrackId;
// The track key of the load currently in flight, set at LoadTrackStreaming entry BEFORE ResetToIdle
// (which clears _currentTrackId) and cleared only when that load's finally runs as the still-active
// operation. It is the idempotency key that collapses a duplicate same-track entry for ONE play
// action into the first load: a second SelectTrackStreaming for the SAME track while its load is
// still in flight is a redundant re-dispatch (UI double-fire, queue re-entry, or a JS false-end
// auto-advance back onto the same track), not a real track switch, so it is dropped. A load for a
// DIFFERENT track is a genuine switch and supersedes as before — this guard never suppresses it.
// Null when no load is in flight.
private string? _loadInFlightTrackId;
// Monotonic load-generation counter (diagnostic). Incremented on every LoadTrackStreaming entry and
// stamped into the load's logs so two loads for ONE user play action — the "Duration set from header
// logged twice" double-load hypothesis that needs in-browser confirmation — are unmistakable: a
// single play should show exactly one "Streaming load #N started"/"finished" pair. If two overlapping
// starts appear for one click, the generation ids pin the re-entrancy. Cheap (an int per load) and
// never gates behavior.
private int _loadGeneration;
// The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and
// reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the
// initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup
// header, and the byte offsets all belong to one artifact). Defaults to Lossless until a load resolves.
private AudioFormat _currentFormat = AudioFormat.Lossless;
// Phase 16 play-session telemetry (§2.1). The tracker observes the playback lifecycle and emits at
// most one bucketed play event per session, behind the engagement floor. Attached after construction
// by AudioPlayerProvider (the player is not DI-registered), mirroring how QueueService binds — no
// constructor growth propagated through DI, no construction cycle. Null when telemetry is not wired
// (e.g. unit tests that construct the player without it), so every call is null-guarded.
private PlayTracker? _playTracker;
private BeaconInterop? _beacon;
private DotNetObjectReference<StreamingAudioPlayerService>? _unloadRef;
private string? _unloadKey;
// One-shot guard so the play session opens exactly once per LoadTrackStreaming — never on the
// SeekBeyondBuffer re-stream, which reuses _currentTrackId and re-runs the playback-start transition
// with _streamingPlaybackStarted reset. A seek-beyond-buffer is the SAME play (§1d), so it must not
// open a new session. Set true when the session opens; reset only by LoadTrackStreaming.
private bool _sessionOpened;
public StreamingAudioPlayerService(
AudioInteropService audioInterop,
TrackMediaClient trackMediaClient,
ILogger<StreamingAudioPlayerService> logger)
: base(audioInterop, trackMediaClient)
{
_logger = logger;
}
/// <summary>
/// Wire the play-session tracker and beacon transport into the player after construction (Phase 16
/// §2.1). Called once by <c>AudioPlayerProvider</c>. Kept off the constructor deliberately: the player
/// is built with <c>new</c> by the provider (not DI), so threading the tracker through the constructor
/// would force the provider to resolve it too — instead the provider injects the tracker's collaborators
/// and hands a built tracker here, the same post-construction binding QueueService uses. Also registers
/// the page-unload handler so a mid-play tab-close still records the play via sendBeacon.
/// </summary>
public void AttachTracker(PlayTracker tracker, BeaconInterop beacon)
{
_playTracker = tracker;
_beacon = beacon;
_unloadRef = DotNetObjectReference.Create(this);
_unloadKey = PlayerId;
// Fire-and-forget: registration only needs to have happened before the listener leaves; it
// never gates playback. A failure simply means tab-close mid-play isn't recorded.
_ = _beacon.RegisterUnloadAsync(_unloadKey, _unloadRef, nameof(OnPageUnload));
}
/// <summary>
/// Close the open play session as the page unloads (pagehide / visibility→hidden). Invoked
/// synchronously from the beacon's unload handler so the session's beacon is queued before the page
/// freezes. <see cref="PlayTracker.Close"/> is idempotent, so a later organic close is a no-op.
/// </summary>
[JSInvokable]
public void OnPageUnload() => _playTracker?.Close();
// Advance the play-session high-water mark on each progress tick (§2.1). Seeking backward never
// lowers it — the tracker takes the max.
protected override void OnProgressTick(double currentTime) => _playTracker?.OnProgress(currentTime);
// Organic end-of-stream closes the session; the bucket reflects the high-water fraction reached.
protected override void OnPlaybackEnded() => _playTracker?.Close();
public override async Task SelectTrack(TrackDto track)
{
await SelectTrackStreaming(track);
}
/// <inheritdoc />
public async Task WarmAudioContext()
{
await EnsureInitializedAsync();
await _audioInterop.EnsureAudioContextReady(PlayerId);
}
public async Task SelectTrackStreaming(TrackDto track)
{
await EnsureInitializedAsync();
// Resume AudioContext immediately on track selection (user interaction) to avoid clicks later
await _audioInterop.EnsureAudioContextReady(PlayerId);
await NotifyTrackSelected();
await LoadTrackStreaming(track);
await NotifyStateChanged();
}
/// <inheritdoc />
public async Task StageTrack(TrackDto track)
{
// Pure state: expose the track as current so the bar shows it ready, but do NOT
// initialize the player, resume the AudioContext, or start streaming. Those steps
// require a user gesture and run on the first play click via SelectTrackStreaming.
CurrentTrack = track;
ErrorMessage = null;
await NotifyStateChanged();
}
private async Task LoadTrackStreaming(TrackDto track)
{
// Idempotency guard (single-load invariant). A load already in flight for THIS SAME track means
// a duplicate dispatch of one play action — a UI double-fire, a queue re-entry, or a JS false-end
// that auto-advanced back onto the same track. Dropping it collapses the play to exactly one load
// without touching the live operation. A load for a DIFFERENT track is a real switch and falls
// through to supersede the in-flight one exactly as before, so this never blocks navigation. The
// check is on the WASM single-threaded dispatcher, so reading/writing _loadInFlightTrackId needs
// no lock; the field is set below before the first await and cleared in finally for the active load.
if (_loadInFlightTrackId is { } inFlight && inFlight == track.EntryKey)
{
_logger.LogInformation(
"Streaming load for track {TrackId} skipped — a load for the same track is already in flight (single-load guard)",
track.EntryKey);
return;
}
_loadInFlightTrackId = track.EntryKey;
// Always reset to clean state before loading new track. ResetToIdle
// both cancels and awaits any in-flight streaming loop, so by the time
// we return from it the previous loop is guaranteed to have exited and
// there is no risk of interleaved ProcessStreamingChunk calls against
// the single-instance JS StreamDecoder. clearLoadGuard:false — we just armed
// _loadInFlightTrackId for THIS load; the prologue reset must not wipe it.
await ResetToIdle(clearLoadGuard: false);
// Stamp this load with a fresh generation id (diagnostic — see _loadGeneration). Logged at
// start and finish so a double-load shows as two overlapping start/finish pairs for one play.
var loadGeneration = ++_loadGeneration;
_logger.LogInformation("Streaming load #{Gen} started for track {TrackId}", loadGeneration, track.EntryKey);
// Save track ID for seek operations
_currentTrackId = track.EntryKey;
// A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the
// one-shot session-open guard; the session actually opens at the playback-start transition
// below (a track that fails to load never reaches it, so it does not count).
_sessionOpened = false;
// Expose to UI immediately — Now-Playing surfaces should reflect the selected
// track while it's still loading, not only after playback starts.
CurrentTrack = track;
// Create new cancellation token for this streaming operation. Capture it in a local
// so the catch/finally can compare identity against _streamingCancellation: a seek
// replaces _streamingCancellation with its own seekCts before this load's continuation
// resumes on the single-threaded WASM dispatcher, and we must not clobber the seek's state.
var loadCts = new CancellationTokenSource();
_streamingCancellation = loadCts;
// Fetch the waveform profile alongside the audio. Fire-and-forget against the same
// streaming token so a track switch abandons it; it only updates display state and must
// never gate or fail the audio load (a missing profile yields the flat-seekbar fallback).
_ = LoadWaveformProfileAsync(track.EntryKey, loadCts.Token);
try
{
// Set state to indicate loading has started
ErrorMessage = null;
LoadProgress = 0;
IsLoading = true;
IsStreamingMode = true;
// Reset adaptive buffer sizing
_currentBufferSize = DefaultBufferSize;
_consecutiveSlowReads = 0;
await NotifyStateChanged();
// Resolve the delivery format for this load BEFORE requesting bytes (Phase 18, default policy
// OQ2). When Opus is chosen the sidecar is fetched and injected into the JS player here, ahead of
// InitializeStreaming, honouring the 18.4 set-before-init contract. The result is captured so the
// seek-beyond-buffer re-fetch reuses the same artifact.
_currentFormat = await ResolveStreamFormatAsync(track.EntryKey, loadCts.Token);
// Direction B: fetch the FIRST bounded segment to learn the total file length and the
// content type. The 206 Content-Range carries the total; the segment loop advances its
// cursor toward it. The decoder is initialized with the TOTAL length (not the segment
// length) so a bounded segment's small Content-Length never trips its byte-count
// completion early — segment boundaries are invisible to the decoder, which sees one
// continuous in-order byte stream. Passing the streaming token aborts the server
// connection on a navigation/track switch instead of leaving it draining bytes.
var firstSegment = await _trackMediaClient.GetTrackMedia(
track.EntryKey,
byteOffset: 0,
byteEnd: SegmentSizeBytes - 1,
format: _currentFormat,
cancellationToken: loadCts.Token);
if (!firstSegment.Success)
{
var technicalError = firstSegment.GetMessage();
_logger.LogError("Failed to get track media for {TrackId}: {Error}",
track.EntryKey, technicalError);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
return;
}
if (firstSegment.Value == null)
{
const string technicalError = "No audio returned from server";
_logger.LogError("No audio data returned for track {TrackId}", track.EntryKey);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
return;
}
// Ownership of the first segment transfers to the segment loop, which disposes it (and
// every subsequent segment). No `using` here — a double dispose is avoided and the socket
// is released the moment the loop finishes consuming the segment.
var audio = firstSegment.Value;
// The total logical length the decoder must see. On a 206 the Content-Range carries it;
// a 200 (server ignored Range / file ≤ one segment) has no Content-Range, so fall back to
// the body's own Content-Length — that body IS the whole file in that case.
var totalLength = audio.TotalLength ?? audio.ContentLength;
// Initialize streaming mode with the TOTAL length and media type (drives JS
// format-decoder selection). See above: total, not segment, length.
var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, totalLength, audio.ContentType);
if (!streamingResult.Success)
{
audio.Dispose();
var technicalError = $"Failed to initialize streaming: {streamingResult.Error}";
_logger.LogError("Streaming initialization failed for track {TrackId}: {Error}",
track.EntryKey, technicalError);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
return;
}
// Forward segmentation from byte 0. The first segment is already in hand; the loop pumps
// it, then fetches subsequent bounded segments gated on the scheduler fill signal.
_activeStreamingTask = RunSegmentedStreamAsync(
track.EntryKey, audio, cursor: 0, totalLength, seekPosition: null, loadCts.Token);
await _activeStreamingTask;
}
catch (OperationCanceledException) when (loadCts.IsCancellationRequested)
{
// Cancellation is expected when this load was superseded (track switch or seek).
// The when filter ensures HttpClient timeout OCEs — where loadCts was NOT
// cancelled — fall through to the error handler below instead of being swallowed.
_logger.LogDebug("Audio streaming cancelled for track {TrackId}", track.EntryKey);
// Only reset streaming state if this load is still the active operation. A seek
// in flight has already replaced _streamingCancellation with its own seekCts and
// owns IsLoaded/IsStreamingMode; clobbering them here corrupts the seek mid-flight.
if (ReferenceEquals(_streamingCancellation, loadCts))
{
IsLoaded = false;
IsStreamingMode = false;
}
}
catch (Exception ex)
{
StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey);
var userError = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
// Mid-stream failure (playback was already underway): halt the JS scheduler into a clean
// paused-but-loaded state exactly as the seek path does via RecoverFromFailedRefill, rather
// than resetting to unloaded and letting the scheduler's buffered tail drain into a silent
// false end (AC6). Apply only when this load is still the active operation — a superseding
// seek owns state and has already replaced _streamingCancellation with its own CTS.
if (_streamingPlaybackStarted && ReferenceEquals(_streamingCancellation, loadCts))
{
await RecoverFromFailedRefill(CurrentTime, userError);
}
else if (ReferenceEquals(_streamingCancellation, loadCts))
{
// First-segment failure (nothing buffered / playing yet), still the active operation:
// the normal unload-to-error path is correct — nothing is in the scheduler to halt.
ErrorMessage = userError;
LoadProgress = 0;
IsLoaded = false;
IsStreamingMode = false;
}
else
{
// Superseded load: a newer seek (or track switch) has already claimed _streamingCancellation
// and owns all shared state. Writing IsLoaded/IsStreamingMode here would corrupt the live
// operation — mirror the OCE catch's identity guard and do nothing to shared state.
_logger.LogDebug("Generic throw on superseded load for track {TrackId} — newer operation owns state, skipping unload", track.EntryKey);
}
}
finally
{
IsLoading = false;
_logger.LogInformation("Streaming load #{Gen} finished for track {TrackId} (superseded={Superseded})",
loadGeneration, track.EntryKey, !ReferenceEquals(_streamingCancellation, loadCts));
// Release the single-load guard only when this load is still the active operation. A
// superseding load for a DIFFERENT track has already overwritten _loadInFlightTrackId with
// its own key before its first await; clearing it here would unlatch the guard mid-way
// through that newer load and let a duplicate of IT slip through. The CTS identity is the
// same "am I still the active operation?" test the notify/state guards use. ResetToIdle
// (the supersede path) does not touch this field — the incoming load owns its lifecycle.
if (ReferenceEquals(_streamingCancellation, loadCts))
{
_loadInFlightTrackId = null;
// Only notify if this load is still the active operation. A superseding seek
// owns state notifications; firing here mid-seek would push a stale snapshot.
await NotifyStateChanged();
}
}
}
/// <summary>
/// Resolves which delivery format this load should request (Phase 18 default policy, OQ2): Opus when the
/// browser can decode Ogg Opus AND a sidecar exists for the track, otherwise lossless. When Opus is
/// chosen the sidecar is injected into the JS player here (set-before-init, the 18.4 contract) so the
/// decoder has its setup header + seek index before <c>InitializeStreaming</c> builds it.
/// <para>
/// This is the single, deliberately-overridable seam for the listener quality preference (wave 18.6).
/// 18.6 overrides this to honour the user's "streaming quality" toggle — returning lossless when the
/// listener picked it, and otherwise falling through to this capability-gated default. The capability
/// gate (AC7) and the sidecar-absent → lossless fallback (C2) stay here so any override inherits both:
/// a browser that cannot decode Opus, or a track with no sidecar, always lands on lossless and plays.
/// </para>
/// </summary>
protected virtual async Task<AudioFormat> ResolveStreamFormatAsync(string entryKey, CancellationToken cancellationToken)
{
// Capability gate first (AC7): never hand Ogg Opus to a browser that cannot decode it.
if (!await _audioInterop.CanDecodeOggOpus())
{
return AudioFormat.Lossless;
}
// The sidecar must be present (and parseable by the JS decoder) to seek an Opus stream. Its absence
// means the track has no Opus artifact yet (legacy / not backfilled / transcode failed) — request
// lossless rather than Opus-without-a-sidecar (the server would C2-fall-back anyway, but asking for
// lossless keeps the request honest and avoids a wasted Opus-then-fallback round-trip).
var sidecar = await _trackMediaClient.GetOpusSidecarAsync(entryKey, cancellationToken);
if (!sidecar.Success || sidecar.Value is not { Length: > 0 } sidecarBytes)
{
return AudioFormat.Lossless;
}
// Inject BEFORE InitializeStreaming (the set-before-init contract). A parse failure here means the
// bytes are not a usable sidecar — fall back to lossless so a malformed sidecar never breaks playback.
var injected = await _audioInterop.SetOpusSidecar(PlayerId, sidecarBytes);
if (!injected.Success)
{
_logger.LogWarning("Opus sidecar for {EntryKey} failed to parse ({Error}); falling back to lossless.",
entryKey, injected.Error);
return AudioFormat.Lossless;
}
return AudioFormat.Opus;
}
/// <summary>
/// Fetches and decodes the track's waveform loudness profile, then notifies state so the
/// seek zone re-renders with real bars. Best-effort: a 404 (no stored profile) or any other
/// failure simply leaves <see cref="AudioPlayerService.WaveformProfile"/> null, which the
/// WaveformSeeker renders as a flat-but-seekable fallback. Never throws into the load path.
/// </summary>
private async Task LoadWaveformProfileAsync(string entryKey, CancellationToken cancellationToken)
{
WaveformProfile = null;
try
{
var result = await _trackMediaClient.GetWaveformProfileAsync(entryKey, cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
if (result.Success && result.Value is { } dto)
{
WaveformProfile = DecodeWaveformProfile(dto);
await NotifyStateChanged();
}
}
catch (OperationCanceledException)
{
// Track switched or stopped before the profile arrived — nothing to surface.
}
catch (Exception ex)
{
// A failed profile fetch must not disturb playback; log and fall back to flat bars.
_logger.LogDebug(ex, "Failed to load waveform profile for {EntryKey}", entryKey);
}
}
/// <summary>
/// Decodes a <see cref="WaveformProfileDto"/> (base64 of byte[BucketCount], each 0..255) into
/// a normalized double[] in [0, 1]. Returns null if the payload is malformed so callers treat
/// it as "no profile" rather than rendering garbage bars.
/// </summary>
private static double[]? DecodeWaveformProfile(WaveformProfileDto dto)
{
if (string.IsNullOrEmpty(dto.Data)) return null;
byte[] bytes;
try
{
bytes = Convert.FromBase64String(dto.Data);
}
catch (FormatException)
{
return null;
}
if (bytes.Length == 0) return null;
var profile = new double[bytes.Length];
for (var i = 0; i < bytes.Length; i++)
{
profile[i] = bytes[i] / 255.0;
}
return profile;
}
/// <summary>
/// Phase 21 Direction B — the single segmented forward read loop, shared by the initial load and
/// the seek/refill path (the convergence C1/C5 require: one cursor, one fetch mechanism, no forked
/// path). It pumps the FIRST segment (already fetched by the caller), then fetches subsequent
/// bounded <c>bytes=cursor-(cursor+SegmentSizeBytes-1)</c> 206 segments — each only AFTER the
/// scheduler drains below low-water — until the cursor reaches <paramref name="totalLength"/>.
/// Because each segment is bounded and fully consumed before the next is requested, the browser
/// holds at most ~one segment of raw bytes (the network-memory bound), while the decoder sees one
/// continuous in-order byte stream across segment boundaries (the demuxer/decoder buffer partial
/// frames/pages across the boundary exactly as for arbitrary chunks today — no per-segment reinit).
/// </summary>
/// <param name="firstSegment">The already-fetched first segment (byte <paramref name="cursor"/>).
/// Owned by this method, which disposes it; subsequent segments are fetched and disposed inline.</param>
/// <param name="cursor">File-absolute byte offset the first segment starts at (0 for a fresh load,
/// the resolved seek offset for a refill).</param>
/// <param name="totalLength">Total file length in bytes — the EOF boundary the cursor advances
/// toward. The decoder is initialized/reinitialized against this, not the per-segment length.</param>
/// <param name="seekPosition">Non-null for a seek/refill: the decoder is reinitialized for the
/// header-less Range continuation at this time before the first segment's bytes are fed (WAV
/// retains its header, Opus re-applies the cached setup + lead-trim). Null for a forward load from
/// byte 0, where the first segment carries the header and no reinit is needed.</param>
private async Task RunSegmentedStreamAsync(
string trackId,
TrackMediaResponse firstSegment,
long cursor,
long totalLength,
double? seekPosition,
CancellationToken cancellationToken)
{
byte[]? buffer = null;
var segment = firstSegment;
try
{
// Seek/refill: reinitialize the active decoder for the header-less continuation ONCE,
// before any continuation bytes are fed. Forward-from-zero (seekPosition null) skips this
// — its first segment carries the real header the decoder parses. Done here, inside the
// single loop, so seek and forward share the same fetch+pump mechanism (no forked path).
if (seekPosition is { } resumeAt)
{
// The decoder byte-counts the header-less continuation against the bytes REMAINING
// from the range start to EOF (total cursor), not the absolute total — that is what
// reinitializeForRangeContinuation expects (StreamDecoder.remainingByteLength). The
// loop's own cursor still targets the absolute totalLength for EOF.
var remainingBytes = Math.Max(0, totalLength - cursor);
var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, remainingBytes, resumeAt);
if (!reinitResult.Success)
{
throw new Exception($"Failed to reinitialize for offset streaming: {reinitResult.Error}");
}
}
buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize); // larger rental to fit adaptive sizing
var readTimer = System.Diagnostics.Stopwatch.StartNew();
// Segment loop. Each iteration fully consumes one bounded 206 body, advancing the cursor by
// the bytes received. The next segment is fetched only when the scheduler is below
// high-water (the inter-segment gate). EOF is the cursor reaching totalLength, or a short
// segment (server returned fewer bytes than requested — the final slice).
while (true)
{
long segmentBytesRead = 0;
int currentBytes;
do
{
readTimer.Restart();
currentBytes = await segment.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
readTimer.Stop();
AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds);
if (currentBytes > 0)
{
segmentBytesRead += currentBytes;
// Slice to the exact bytes read: the pooled buffer is rented at MaxBufferSize
// and may carry stale bytes past currentBytes from a prior rental — handing the
// full array to JS would serialise that garbage into the audio stream.
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
if (!chunkResult.Success)
{
var error = $"Failed to process streaming chunk: {chunkResult.Error}";
_logger.LogWarning("Chunk processing failed: {Error}", error);
throw new Exception(error);
}
CanStartStreaming = chunkResult.CanStartStreaming;
HeaderParsed = chunkResult.HeaderParsed;
BufferedChunks = chunkResult.BufferCount;
// Set duration from header when available (only set once)
if (chunkResult.Duration.HasValue && Duration == null)
{
Duration = chunkResult.Duration.Value;
_logger.LogInformation("Duration set from header: {Duration:F2} seconds", Duration);
// Feed the once-only duration to the play session for the completion
// fraction. No-op when no session is open; idempotent otherwise.
_playTracker?.SetDuration(chunkResult.Duration.Value);
}
// Start playback as soon as we can — at the min-buffer threshold, exactly as
// before (C2: first audio is not gated on the segment boundary; the first
// segment alone clears the threshold).
if (!_streamingPlaybackStarted && CanStartStreaming)
{
var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId);
if (playbackResult.Success)
{
_streamingPlaybackStarted = true;
IsPlaying = true;
IsPaused = false;
IsLoaded = true; // loaded and ready, even while still downloading
ErrorMessage = null;
// Open the play session exactly once per load, at the moment playback
// truly begins (§2.1). The _sessionOpened guard keeps a seek/refill
// re-stream — which re-enters this transition with
// _streamingPlaybackStarted reset — from opening a second session for
// the same play. Duration may already be known, so re-feed it.
if (!_sessionOpened && _currentTrackId is { } trackKey)
{
_sessionOpened = true;
_playTracker?.OnPlaybackStarted(trackKey);
if (Duration is { } d)
_playTracker?.SetDuration(d);
}
await NotifyStateChanged(); // immediate — critical state change
}
else
{
var technicalError = $"Failed to start streaming playback: {playbackResult.Error}";
_logger.LogError("Failed to start playback: {Error}", technicalError);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
}
}
// Progress against the total file length (cursor + bytes consumed so far).
if (totalLength > 0)
{
LoadProgress = Math.Min(1.0, (double)(cursor + segmentBytesRead) / totalLength);
}
await ThrottledNotifyStateChanged();
// Per-chunk back-pressure — the bound that actually holds for high-density codecs.
// The inter-segment gate alone is matched to WAV's byte density (~24 s of audio per
// 4 MB segment) but NOT to Opus: at 320 kbps a 4 MB segment is ~100 s of decodable
// audio. The inner loop has the whole segment's bytes already in hand, so with no
// network wait to pace it, it would decode the ENTIRE segment eagerly — piling tens
// of MB of decoded f32 PCM AHEAD of a playhead that has barely moved, before the
// inter-segment gate ever runs. With HW accel off that lookahead lives in main-
// process RAM, and the byte ceiling cannot save us because nothing on this path
// polls it. So drain to low-water per chunk once the scheduler is over high-water.
//
// Gated on _streamingPlaybackStarted so this can NEVER block first audio (C2): until
// playback starts the playhead does not advance, so the forward fill would never
// drain and the loop would deadlock. The 30 s high-water sits far above the
// 6-buffer playback-start minimum, so in practice the gate is not even reached
// before playback begins — the guard is the correctness backstop, not the common
// case. Reads the piggybacked flag (no extra interop hop) to DECIDE to drain; the
// drain helper then polls IsProductionPaused — the same steady-state-reads-flag /
// throttled-state-polls split the inter-segment gate uses.
if (_streamingPlaybackStarted && chunkResult.ProductionPaused)
{
await DrainBackpressureAsync(cancellationToken);
}
}
} while (currentBytes > 0);
// Segment fully consumed; advance the cursor and release this segment's stream/socket
// before deciding whether to fetch the next. Disposing here keeps exactly one segment's
// raw bytes resident at a time.
cursor += segmentBytesRead;
segment.Dispose();
segment = null!;
// EOF: cursor reached the total file length. This is the sole forward-EOF condition.
// A short segment body (segmentBytesRead < SegmentSizeBytes) is NOT an EOF signal —
// the inner read loop fully drains the HTTP body, so a short body means the server
// sent fewer bytes than the bounded range we requested. While cursor < totalLength that
// can only be a connection drop / truncated stream, NOT the file tail — route it to
// the same clean-failure recovery as a fetch error rather than silently completing.
var reachedTotal = totalLength > 0 && cursor >= totalLength;
if (reachedTotal)
{
break;
}
// Guard: if the body was short but we haven't reached totalLength, the stream was
// truncated mid-segment (connection drop / premature close). Surface as an error so
// the scheduler is halted rather than left to drain its buffered tail into a false end.
if (segmentBytesRead < SegmentSizeBytes)
{
throw new Exception(
$"Stream truncated at byte {cursor} of {totalLength}: received {segmentBytesRead} bytes " +
$"but expected up to {SegmentSizeBytes} and have not reached EOF");
}
// Inter-segment back-pressure gate (Phase 21.2 fill signal, gating SEGMENT FETCH). Do not
// fetch the next segment while the scheduler is over high-water; wait until it drains
// below low-water. Because the browser only buffers bounded segments and we hold off
// requesting the next one, raw network memory stays at ~one segment. Shares the same
// drain helper as the per-chunk gate above. No _streamingPlaybackStarted guard is needed
// here (unlike the per-chunk gate): reaching this point means a full segment was consumed,
// which is ~24 s (WAV) / ~100 s (Opus) of audio — far past the 6-buffer playback-start
// minimum — so playback is always running by now and the fill can drain. A file that fits
// in one segment hits EOF and breaks above, never reaching this gate.
await DrainBackpressureAsync(cancellationToken);
// Fetch the next bounded segment. The end offset is clamped implicitly by the server
// (a request past EOF yields the available tail as a short slice, caught above).
var nextEnd = cursor + SegmentSizeBytes - 1;
var nextResult = await _trackMediaClient.GetTrackMedia(
trackId,
byteOffset: cursor,
byteEnd: nextEnd,
format: _currentFormat,
cancellationToken: cancellationToken);
if (!nextResult.Success || nextResult.Value == null)
{
var technicalError = nextResult.GetMessage() ?? "Failed to fetch next stream segment";
_logger.LogError("Failed to fetch segment at offset {Offset} for {TrackId}: {Error}",
cursor, trackId, technicalError);
throw new Exception(technicalError);
}
segment = nextResult.Value;
}
// Notify the JS decoder that the stream is finished. The decoder marks completion by byte
// count against the total it was initialized with; this explicit signal flushes the
// residual tail and covers the (rare) case where the total was unknown.
await _audioInterop.MarkStreamCompleteAsync(PlayerId);
LoadProgress = 1.0;
await NotifyStateChanged();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Cancellation is expected during track switch or seek — propagate cleanly.
throw;
}
catch (Exception ex)
{
StreamingErrorHandler.LogError(_logger, ex, "RunSegmentedStreamAsync");
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
LoadProgress = 0;
IsLoaded = false;
IsStreamingMode = false;
await NotifyStateChanged();
throw;
}
finally
{
// Release the last segment (if a fetch failed mid-loop it may still be held) and the buffer.
segment?.Dispose();
if (buffer != null)
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
/// <summary>
/// In streaming mode, Stop fully resets to Idle state since audio data is consumed.
/// This is equivalent to Unload for streaming playback.
/// </summary>
public override async Task Stop()
{
// In streaming mode, Stop = Unload (data is consumed, can't replay)
await ResetToIdle();
}
/// <summary>
/// Fully resets the player to Idle state, ready for a new track.
/// </summary>
public override async Task Unload()
{
await ResetToIdle();
}
/// <summary>
/// Override Seek to handle seek-beyond-buffer for streaming mode.
/// </summary>
public override async Task Seek(double position)
{
if (!IsLoaded || !IsStreamingMode) return;
try
{
var result = await _audioInterop.SeekAsync(PlayerId, position);
if (result.Success)
{
if (result.SeekBeyondBuffer && result.ByteOffset >= 0)
{
// Need to load new stream from offset
_logger.LogInformation("Seeking beyond buffer to {Position:F2}s, byte offset: {ByteOffset}",
position, result.ByteOffset);
await SeekBeyondBuffer(position, result.ByteOffset);
}
else
{
// Seek within buffer succeeded
CurrentTime = position;
ErrorMessage = null;
await NotifyStateChanged();
}
}
else
{
ErrorMessage = $"Seek error: {result.Error}";
await NotifyStateChanged();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error seeking to position {Position}", position);
ErrorMessage = $"Error seeking: {ex.Message}";
await NotifyStateChanged();
}
}
/// <summary>
/// Handle seeking beyond the currently buffered content by requesting a new stream from offset.
/// </summary>
private async Task SeekBeyondBuffer(double seekPosition, long byteOffset)
{
if (string.IsNullOrEmpty(_currentTrackId))
{
ErrorMessage = "Cannot seek - no track loaded";
return;
}
// Capture into a non-null local: _currentTrackId is the field a track-switch could clear, but
// this seek operates against the track loaded NOW; the segment loop needs a stable id.
var trackId = _currentTrackId;
IsSeekingBeyondBuffer = true;
// Cancel the current streaming loop AND wait for it to fully exit before
// starting a new one. The previous loop's pending ReadAsync will throw
// OperationCanceledException asynchronously; if we kick off a new loop
// immediately, both can race against the single-instance JS StreamDecoder
// and corrupt decode state. Draining here is the load-bearing guarantee.
//
// Invariant: any caller that supersedes a load WITHOUT wanting the load's
// state reset must assign its own CTS to _streamingCancellation *before*
// its first await. LoadTrackStreaming's OCE continuation fires during the
// drain await on the shared _activeStreamingTask; it resets IsLoaded/
// IsStreamingMode only when _streamingCancellation still equals its loadCts.
// Assigning seekCts synchronously here makes that identity check fail, so
// the seek's state survives. (ResetToIdle deliberately does NOT do this —
// it wants the reset, and nulls _streamingCancellation only after the drain.)
var oldCts = _streamingCancellation;
var seekCts = new CancellationTokenSource();
_streamingCancellation = seekCts;
oldCts?.Cancel();
await DrainActiveStreamingTaskAsync();
oldCts?.Dispose();
// Single-writer discipline (C6/AC8): all three failure exits must share the same guard.
// TrackMediaClient.GetTrackMedia swallows OperationCanceledException and returns
// Success==false, so a superseded seek lands in the media-fetch-fail branch below
// rather than in the OCE catch. Without the guard those branches would call
// RecoverFromFailedRefill — running clearForSeek + setPlaybackOffset against the player
// state the NEWER seek now owns. A local predicate keeps all three exits symmetric so a
// future exit cannot forget the check.
bool IsStillActiveSeek() => ReferenceEquals(_streamingCancellation, seekCts);
try
{
// Update UI immediately
CurrentTime = seekPosition;
await NotifyStateChanged();
// Request the FIRST bounded segment from the resolved offset (Direction B — converged with
// the forward path). Reuse the format the initial load resolved to (_currentFormat): an
// Opus seek must come back as Opus bytes so the cached setup header + page-aligned
// byteOffset (resolved JS-side from the Opus seek index) match the continuation; WAV resolves
// its offset from the header — one seam, format-appropriate math (AC9 / §3.4a C). The
// segment loop then continues forward segmentation from this offset exactly as a fresh load
// does from 0 — no forked fetch path (C1/C5).
var firstSegment = await _trackMediaClient.GetTrackMedia(
trackId,
byteOffset,
byteEnd: byteOffset + SegmentSizeBytes - 1,
format: _currentFormat,
cancellationToken: seekCts.Token);
if (!firstSegment.Success || firstSegment.Value == null)
{
var technicalError = firstSegment.GetMessage() ?? "Failed to load audio from position";
_logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError);
// Guard: a superseded seek must NOT touch shared state. The newer seek owns teardown.
if (IsStillActiveSeek())
{
await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(technicalError));
}
else
{
_logger.LogDebug("Media-fetch failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition);
}
return;
}
var audio = firstSegment.Value;
// The absolute EOF boundary the segment loop's cursor targets. On a 206 the Content-Range
// carries the file total; on a 200 (single-segment file) fall back to cursor + body length.
var totalLength = audio.TotalLength ?? (byteOffset + audio.ContentLength);
// Reset streaming state for the new stream. The decoder reinit for the header-less
// continuation happens INSIDE RunSegmentedStreamAsync (seekPosition non-null), so seek and
// forward share one fetch+pump+reinit mechanism. A reinit failure there throws and lands in
// the catch below, which recovers when still the active seek — the same clean-failure path
// (AC6) the old explicit reinit branch had, now unified with the fetch-failure path.
_streamingPlaybackStarted = false;
CanStartStreaming = false;
HeaderParsed = false;
BufferedChunks = 0;
// Stream from offset via the shared segment loop. Ownership of `audio` transfers to it.
_activeStreamingTask = RunSegmentedStreamAsync(
trackId, audio, cursor: byteOffset, totalLength, seekPosition, seekCts.Token);
await _activeStreamingTask;
IsSeekingBeyondBuffer = false;
}
catch (OperationCanceledException) when (seekCts.IsCancellationRequested)
{
// Another seek or stop interrupted this one. Only clear the flag if we are
// still the active seek — if _streamingCancellation has been replaced, a
// newer seek is in progress and owns the flag.
_logger.LogDebug("Seek beyond buffer cancelled");
if (IsStillActiveSeek())
{
IsSeekingBeyondBuffer = false;
}
}
catch (Exception ex)
{
// A refill fetch can fail deep into a long mix (the listener didn't initiate it). Recover
// into a clean paused-but-loaded state (AC6) rather than leaving the starved scheduler to
// fire a silent false end. Only when we are still the active seek — a superseding seek owns
// the state and the OCE catch above handles its own teardown.
_logger.LogError(ex, "Error during seek beyond buffer to position {Position}", seekPosition);
if (IsStillActiveSeek())
{
await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(ex.Message));
}
}
}
/// <summary>
/// Clean-failure recovery for a window-miss refill (Phase 21.3 / AC6). A backward seek past the
/// retained tail re-fetches via the existing Range path; that mid-stream fetch the listener did not
/// initiate can fail deep into a long mix. When it does, the pre-seek loop has already been
/// cancelled and drained, but the JS scheduler is still holding stale pre-seek buffers and still
/// "playing" — left alone it drains them and fires a silent false end (the wedged/starved state AC6
/// forbids). This halts the scheduler into a paused-but-loaded state at <paramref name="seekPosition"/>,
/// surfaces a clear error, and leaves the track loaded so the listener can retry the seek or pick
/// another track. Mirrors <c>PlaybackScheduler.playFromPosition</c>'s end-of-buffer recovery: stop
/// pretending to play.
/// </summary>
private async Task RecoverFromFailedRefill(double seekPosition, string userFacingError)
{
// Halt the starved scheduler JS-side (stop sources, drop stale buffers, anchor at the target).
// Best-effort: if even this interop fails the player is no worse off, and we still surface the
// error and settle C# state below.
var recovered = await _audioInterop.RecoverFromFailedRefill(PlayerId, seekPosition);
if (!recovered.Success)
{
_logger.LogWarning("Refill-failure recovery interop did not succeed: {Error}", recovered.Error);
}
// Settle C# into the matching recoverable state: not playing, paused at the target, still loaded
// and still in streaming mode. IsLoaded = true and IsStreamingMode = true are both load-bearing —
// the "paused-but-loaded" contract lets the listener retry the seek (Seek early-returns when
// !IsLoaded || !IsStreamingMode), resume via TogglePlayPause, or pick another track. Resetting
// either to false would wedge at least one of the three retry routes (AC6 / Phase 21.3).
ErrorMessage = userFacingError;
IsPlaying = false;
IsPaused = true;
IsLoaded = true;
IsStreamingMode = true;
CurrentTime = seekPosition;
IsSeekingBeyondBuffer = false;
await NotifyStateChanged();
}
/// <summary>
/// Single method to reset all state - called by both Stop and Unload, and as the prologue of a new
/// load. <paramref name="clearLoadGuard"/> is true for the direct stop/unload/dispose callers and
/// false when a fresh <see cref="LoadTrackStreaming"/> calls it: that load has already set
/// <c>_loadInFlightTrackId</c> to its own key to arm the single-load guard, so the prologue must not
/// wipe it. The direct callers DO clear it so a later replay of the same track is not wrongly
/// suppressed by a guard key left over from an interrupted in-flight load (whose CTS-identity check
/// in finally fails after ResetToIdle nulls the CTS, so the load itself never clears the field).
/// </summary>
private async Task ResetToIdle(bool clearLoadGuard = true)
{
// 0. Close any open play session BEFORE tearing down (§2.1). ResetToIdle is the single funnel
// for stop / unload / dispose / track-switch (a new LoadTrackStreaming calls it first), so a
// superseded listen is recorded here with its high-water bucket. Close is idempotent — if the
// session already closed organically or via the unload beacon, this is a no-op.
_playTracker?.Close();
_sessionOpened = false;
// 1. Cancel any ongoing streaming operation and wait for it to exit
// before tearing down JS state. Otherwise the loop's pending
// ProcessStreamingChunk call can land after StopAsync/UnloadAsync.
_streamingCancellation?.Cancel();
await DrainActiveStreamingTaskAsync();
_streamingCancellation?.Dispose();
_streamingCancellation = null;
// 2. Tell JS to stop and unload
try
{
await _audioInterop.StopAsync(PlayerId);
await _audioInterop.UnloadAsync(PlayerId);
}
catch
{
// Ignore JS errors during cleanup
}
// 3. Reset ALL state to Idle
IsPlaying = false;
IsPaused = false;
IsLoaded = false;
IsLoading = false;
CurrentTime = 0;
Duration = null;
LoadProgress = 0;
ErrorMessage = null;
CurrentTrack = null;
WaveformProfile = null;
// 4. Reset streaming-specific state
IsStreamingMode = false;
CanStartStreaming = false;
HeaderParsed = false;
BufferedChunks = 0;
_streamingPlaybackStarted = false;
IsSeekingBeyondBuffer = false;
_currentTrackId = null;
_currentFormat = AudioFormat.Lossless;
// Direct stop/unload/dispose: release the single-load guard so a later replay of the same track
// is not suppressed. NOT cleared on the load prologue (clearLoadGuard:false) — that load owns the
// key it just armed.
if (clearLoadGuard)
_loadInFlightTrackId = null;
await NotifyStateChanged();
}
/// <summary>
/// Wait for the previously-started streaming loop to fully exit. The caller
/// must have already cancelled <see cref="_streamingCancellation"/>. Swallows
/// the expected OperationCanceledException; any other exception was already
/// surfaced through the loop's own catch block, so we ignore it here too.
/// </summary>
private async Task DrainActiveStreamingTaskAsync()
{
var task = _activeStreamingTask;
if (task == null) return;
try
{
await task;
}
catch (OperationCanceledException)
{
// Expected when we cancelled the loop ourselves.
}
catch
{
// Any other failure was already logged inside the loop.
}
finally
{
// Only clear if we are still the active task — a concurrent caller
// may have started a new stream while we were draining the old one.
if (ReferenceEquals(_activeStreamingTask, task))
{
_activeStreamingTask = null;
}
}
}
/// <summary>
/// Block the segment loop while the scheduler's decoded forward fill is over high-water, resuming
/// once it drains below low-water (Phase 21.2 hysteresis). Shared by the per-chunk gate (inside a
/// segment) and the inter-segment gate so both honor identical drain discipline — a guard present on
/// one path and absent on the other would let one path overshoot the memory bound.
/// <para>
/// The poll awaits on <paramref name="cancellationToken"/>, so a track switch/seek mid-wait throws
/// OCE and unwinds through the existing drain discipline (C6). UC5: a user pause freezes the playhead
/// so the fill never drains on its own — hold here until playback resumes (IsPaused clears) OR the
/// fill drains. Returns immediately when nothing is throttled (the steady-state common case).
/// </para>
/// </summary>
private async Task DrainBackpressureAsync(CancellationToken cancellationToken)
{
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(BackpressurePollMs, cancellationToken);
}
}
private async Task ThrottledNotifyStateChanged()
{
var now = DateTime.UtcNow;
if ((now - _lastNotification).TotalMilliseconds >= NotificationThrottleMs)
{
_lastNotification = now;
await NotifyStateChanged();
}
}
/// <summary>
/// On component unmount we must cancel the in-flight streaming loop and tear
/// down JS callbacks before the JS side's setInterval fires again with a
/// stale DotNetObjectReference. ResetToIdle covers cancellation + JS stop
/// + state reset; the base then disposes the JS player and its callbacks.
/// </summary>
public override async ValueTask DisposeAsync()
{
try
{
// ResetToIdle closes any open play session, so a dispose mid-play still records the listen.
await ResetToIdle();
}
catch
{
// Disposal must not throw; any failure here is best-effort cleanup.
}
// Detach the page-unload handler so the torn-down circuit is never invoked, then release the
// self-reference. Best-effort — the JS side tolerates an absent key.
if (_unloadKey is not null && _beacon is not null)
{
try { await _beacon.UnregisterUnloadAsync(_unloadKey); }
catch { /* best-effort */ }
}
_unloadRef?.Dispose();
_unloadRef = null;
await base.DisposeAsync();
}
private void AdaptBufferSize(int bytesRead, long readTimeMs)
{
// Adaptive buffer sizing based on network performance
if (readTimeMs > 100) // Slow read (>100ms)
{
_consecutiveSlowReads++;
if (_consecutiveSlowReads >= 3 && _currentBufferSize > MinBufferSize)
{
// Reduce buffer size for slow connections
_currentBufferSize = Math.Max(MinBufferSize, _currentBufferSize / 2);
_consecutiveSlowReads = 0;
}
}
else if (readTimeMs < 20 && bytesRead == _currentBufferSize) // Fast read, buffer fully utilized
{
_consecutiveSlowReads = 0;
if (_currentBufferSize < MaxBufferSize)
{
// Increase buffer size for fast connections
_currentBufferSize = Math.Min(MaxBufferSize, _currentBufferSize * 2);
}
}
else
{
_consecutiveSlowReads = 0;
}
}
}