using DeepDrftModels.DTOs; using DeepDrftModels.Enums; using DeepDrftPublic.Client.Clients; using System.Buffers; using Microsoft.Extensions.Logging; using Microsoft.JSInterop; namespace DeepDrftPublic.Client.Services; public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerService { // Configuration constants private const int DefaultBufferSize = 32 * 1024; // 32KB chunks private const int NotificationThrottleMs = 100; // Throttle UI updates to max 10 per second // Adaptive chunk sizing private const int MinBufferSize = 16 * 1024; // 16KB minimum private const int MaxBufferSize = 64 * 1024; // 64KB maximum // Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water // mark, the segment loop stops fetching the next segment and polls IsProductionPaused at this // cadence until the fill drains below low-water. 100 ms is well under the low-water lookahead // (seconds), so resume is prompt relative to the playhead — no starvation (AC3) — while keeping // the poll cheap. The poll honors the loop's cancellation token, so a track switch/seek during a // pause exits through the same drain discipline as a pause during ReadAsync (C6). private const int BackpressurePollMs = 100; // Phase 21 Direction B — forward Range-segment size. The forward stream is fetched as a // sequence of bounded "bytes=cursor-(cursor+SegmentSizeBytes-1)" 206 requests, the next issued // only when the scheduler drains below low-water. Because each request is bounded and fully // consumed before the next is issued, the browser fetch holds AT MOST ~one segment of raw bytes // regardless of file size — this is the network-memory bound the phase exists for (the open-ended // single GET buffered the whole ~970 MB body in the browser even when reads were paused, the // 21.4 finding). 4 MB balances request overhead (a 1 GB mix is ~250 segments) against memory: // at the 30 s high-water mark a fast connection holds well under a segment of unplayed raw bytes, // so the bound is the segment size, not the decoded window. Tunable; not magic. private const long SegmentSizeBytes = 4 * 1024 * 1024; private int _currentBufferSize = DefaultBufferSize; private int _consecutiveSlowReads = 0; // Streaming state properties public bool IsStreamingMode { get; private set; } = false; public bool CanStartStreaming { get; private set; } = false; public bool HeaderParsed { get; private set; } = false; public int BufferedChunks { get; private set; } = 0; public bool IsSeekingBeyondBuffer { get; private set; } = false; private bool _streamingPlaybackStarted = false; private CancellationTokenSource? _streamingCancellation; private Task? _activeStreamingTask; private DateTime _lastNotification = DateTime.MinValue; private readonly ILogger _logger; private string? _currentTrackId; // The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and // reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the // initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup // header, and the byte offsets all belong to one artifact). Defaults to Lossless until a load resolves. private AudioFormat _currentFormat = AudioFormat.Lossless; // Phase 16 play-session telemetry (§2.1). The tracker observes the playback lifecycle and emits at // most one bucketed play event per session, behind the engagement floor. Attached after construction // by AudioPlayerProvider (the player is not DI-registered), mirroring how QueueService binds — no // constructor growth propagated through DI, no construction cycle. Null when telemetry is not wired // (e.g. unit tests that construct the player without it), so every call is null-guarded. private PlayTracker? _playTracker; private BeaconInterop? _beacon; private DotNetObjectReference? _unloadRef; private string? _unloadKey; // One-shot guard so the play session opens exactly once per LoadTrackStreaming — never on the // SeekBeyondBuffer re-stream, which reuses _currentTrackId and re-runs the playback-start transition // with _streamingPlaybackStarted reset. A seek-beyond-buffer is the SAME play (§1d), so it must not // open a new session. Set true when the session opens; reset only by LoadTrackStreaming. private bool _sessionOpened; public StreamingAudioPlayerService( AudioInteropService audioInterop, TrackMediaClient trackMediaClient, ILogger logger) : base(audioInterop, trackMediaClient) { _logger = logger; } /// /// Wire the play-session tracker and beacon transport into the player after construction (Phase 16 /// §2.1). Called once by AudioPlayerProvider. Kept off the constructor deliberately: the player /// is built with new by the provider (not DI), so threading the tracker through the constructor /// would force the provider to resolve it too — instead the provider injects the tracker's collaborators /// and hands a built tracker here, the same post-construction binding QueueService uses. Also registers /// the page-unload handler so a mid-play tab-close still records the play via sendBeacon. /// public void AttachTracker(PlayTracker tracker, BeaconInterop beacon) { _playTracker = tracker; _beacon = beacon; _unloadRef = DotNetObjectReference.Create(this); _unloadKey = PlayerId; // Fire-and-forget: registration only needs to have happened before the listener leaves; it // never gates playback. A failure simply means tab-close mid-play isn't recorded. _ = _beacon.RegisterUnloadAsync(_unloadKey, _unloadRef, nameof(OnPageUnload)); } /// /// Close the open play session as the page unloads (pagehide / visibility→hidden). Invoked /// synchronously from the beacon's unload handler so the session's beacon is queued before the page /// freezes. is idempotent, so a later organic close is a no-op. /// [JSInvokable] public void OnPageUnload() => _playTracker?.Close(); // Advance the play-session high-water mark on each progress tick (§2.1). Seeking backward never // lowers it — the tracker takes the max. protected override void OnProgressTick(double currentTime) => _playTracker?.OnProgress(currentTime); // Organic end-of-stream closes the session; the bucket reflects the high-water fraction reached. protected override void OnPlaybackEnded() => _playTracker?.Close(); public override async Task SelectTrack(TrackDto track) { await SelectTrackStreaming(track); } /// public async Task WarmAudioContext() { await EnsureInitializedAsync(); await _audioInterop.EnsureAudioContextReady(PlayerId); } public async Task SelectTrackStreaming(TrackDto track) { await EnsureInitializedAsync(); // Resume AudioContext immediately on track selection (user interaction) to avoid clicks later await _audioInterop.EnsureAudioContextReady(PlayerId); await NotifyTrackSelected(); await LoadTrackStreaming(track); await NotifyStateChanged(); } /// public async Task StageTrack(TrackDto track) { // Pure state: expose the track as current so the bar shows it ready, but do NOT // initialize the player, resume the AudioContext, or start streaming. Those steps // require a user gesture and run on the first play click via SelectTrackStreaming. CurrentTrack = track; ErrorMessage = null; await NotifyStateChanged(); } private async Task LoadTrackStreaming(TrackDto track) { // Always reset to clean state before loading new track. ResetToIdle // both cancels and awaits any in-flight streaming loop, so by the time // we return from it the previous loop is guaranteed to have exited and // there is no risk of interleaved ProcessStreamingChunk calls against // the single-instance JS StreamDecoder. await ResetToIdle(); // Save track ID for seek operations _currentTrackId = track.EntryKey; // A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the // one-shot session-open guard; the session actually opens at the playback-start transition // below (a track that fails to load never reaches it, so it does not count). _sessionOpened = false; // Expose to UI immediately — Now-Playing surfaces should reflect the selected // track while it's still loading, not only after playback starts. CurrentTrack = track; // Create new cancellation token for this streaming operation. Capture it in a local // so the catch/finally can compare identity against _streamingCancellation: a seek // replaces _streamingCancellation with its own seekCts before this load's continuation // resumes on the single-threaded WASM dispatcher, and we must not clobber the seek's state. var loadCts = new CancellationTokenSource(); _streamingCancellation = loadCts; // Fetch the waveform profile alongside the audio. Fire-and-forget against the same // streaming token so a track switch abandons it; it only updates display state and must // never gate or fail the audio load (a missing profile yields the flat-seekbar fallback). _ = LoadWaveformProfileAsync(track.EntryKey, loadCts.Token); try { // Set state to indicate loading has started ErrorMessage = null; LoadProgress = 0; IsLoading = true; IsStreamingMode = true; // Reset adaptive buffer sizing _currentBufferSize = DefaultBufferSize; _consecutiveSlowReads = 0; await NotifyStateChanged(); // Resolve the delivery format for this load BEFORE requesting bytes (Phase 18, default policy // OQ2). When Opus is chosen the sidecar is fetched and injected into the JS player here, ahead of // InitializeStreaming, honouring the 18.4 set-before-init contract. The result is captured so the // seek-beyond-buffer re-fetch reuses the same artifact. _currentFormat = await ResolveStreamFormatAsync(track.EntryKey, loadCts.Token); // Direction B: fetch the FIRST bounded segment to learn the total file length and the // content type. The 206 Content-Range carries the total; the segment loop advances its // cursor toward it. The decoder is initialized with the TOTAL length (not the segment // length) so a bounded segment's small Content-Length never trips its byte-count // completion early — segment boundaries are invisible to the decoder, which sees one // continuous in-order byte stream. Passing the streaming token aborts the server // connection on a navigation/track switch instead of leaving it draining bytes. var firstSegment = await _trackMediaClient.GetTrackMedia( track.EntryKey, byteOffset: 0, byteEnd: SegmentSizeBytes - 1, format: _currentFormat, cancellationToken: loadCts.Token); if (!firstSegment.Success) { var technicalError = firstSegment.GetMessage(); _logger.LogError("Failed to get track media for {TrackId}: {Error}", track.EntryKey, technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } if (firstSegment.Value == null) { const string technicalError = "No audio returned from server"; _logger.LogError("No audio data returned for track {TrackId}", track.EntryKey); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } // Ownership of the first segment transfers to the segment loop, which disposes it (and // every subsequent segment). No `using` here — a double dispose is avoided and the socket // is released the moment the loop finishes consuming the segment. var audio = firstSegment.Value; // The total logical length the decoder must see. On a 206 the Content-Range carries it; // a 200 (server ignored Range / file ≤ one segment) has no Content-Range, so fall back to // the body's own Content-Length — that body IS the whole file in that case. var totalLength = audio.TotalLength ?? audio.ContentLength; // Initialize streaming mode with the TOTAL length and media type (drives JS // format-decoder selection). See above: total, not segment, length. var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, totalLength, audio.ContentType); if (!streamingResult.Success) { audio.Dispose(); var technicalError = $"Failed to initialize streaming: {streamingResult.Error}"; _logger.LogError("Streaming initialization failed for track {TrackId}: {Error}", track.EntryKey, technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } // Forward segmentation from byte 0. The first segment is already in hand; the loop pumps // it, then fetches subsequent bounded segments gated on the scheduler fill signal. _activeStreamingTask = RunSegmentedStreamAsync( track.EntryKey, audio, cursor: 0, totalLength, seekPosition: null, loadCts.Token); await _activeStreamingTask; } catch (OperationCanceledException) when (loadCts.IsCancellationRequested) { // Cancellation is expected when this load was superseded (track switch or seek). // The when filter ensures HttpClient timeout OCEs — where loadCts was NOT // cancelled — fall through to the error handler below instead of being swallowed. _logger.LogDebug("Audio streaming cancelled for track {TrackId}", track.EntryKey); // Only reset streaming state if this load is still the active operation. A seek // in flight has already replaced _streamingCancellation with its own seekCts and // owns IsLoaded/IsStreamingMode; clobbering them here corrupts the seek mid-flight. if (ReferenceEquals(_streamingCancellation, loadCts)) { IsLoaded = false; IsStreamingMode = false; } } catch (Exception ex) { StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey); var userError = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); // Mid-stream failure (playback was already underway): halt the JS scheduler into a clean // paused-but-loaded state exactly as the seek path does via RecoverFromFailedRefill, rather // than resetting to unloaded and letting the scheduler's buffered tail drain into a silent // false end (AC6). Apply only when this load is still the active operation — a superseding // seek owns state and has already replaced _streamingCancellation with its own CTS. if (_streamingPlaybackStarted && ReferenceEquals(_streamingCancellation, loadCts)) { await RecoverFromFailedRefill(CurrentTime, userError); } else if (ReferenceEquals(_streamingCancellation, loadCts)) { // First-segment failure (nothing buffered / playing yet), still the active operation: // the normal unload-to-error path is correct — nothing is in the scheduler to halt. ErrorMessage = userError; LoadProgress = 0; IsLoaded = false; IsStreamingMode = false; } else { // Superseded load: a newer seek (or track switch) has already claimed _streamingCancellation // and owns all shared state. Writing IsLoaded/IsStreamingMode here would corrupt the live // operation — mirror the OCE catch's identity guard and do nothing to shared state. _logger.LogDebug("Generic throw on superseded load for track {TrackId} — newer operation owns state, skipping unload", track.EntryKey); } } finally { IsLoading = false; // Only notify if this load is still the active operation. A superseding seek // owns state notifications; firing here mid-seek would push a stale snapshot. if (ReferenceEquals(_streamingCancellation, loadCts)) { await NotifyStateChanged(); } } } /// /// Resolves which delivery format this load should request (Phase 18 default policy, OQ2): Opus when the /// browser can decode Ogg Opus AND a sidecar exists for the track, otherwise lossless. When Opus is /// chosen the sidecar is injected into the JS player here (set-before-init, the 18.4 contract) so the /// decoder has its setup header + seek index before InitializeStreaming builds it. /// /// This is the single, deliberately-overridable seam for the listener quality preference (wave 18.6). /// 18.6 overrides this to honour the user's "streaming quality" toggle — returning lossless when the /// listener picked it, and otherwise falling through to this capability-gated default. The capability /// gate (AC7) and the sidecar-absent → lossless fallback (C2) stay here so any override inherits both: /// a browser that cannot decode Opus, or a track with no sidecar, always lands on lossless and plays. /// /// protected virtual async Task ResolveStreamFormatAsync(string entryKey, CancellationToken cancellationToken) { // Capability gate first (AC7): never hand Ogg Opus to a browser that cannot decode it. if (!await _audioInterop.CanDecodeOggOpus()) { return AudioFormat.Lossless; } // The sidecar must be present (and parseable by the JS decoder) to seek an Opus stream. Its absence // means the track has no Opus artifact yet (legacy / not backfilled / transcode failed) — request // lossless rather than Opus-without-a-sidecar (the server would C2-fall-back anyway, but asking for // lossless keeps the request honest and avoids a wasted Opus-then-fallback round-trip). var sidecar = await _trackMediaClient.GetOpusSidecarAsync(entryKey, cancellationToken); if (!sidecar.Success || sidecar.Value is not { Length: > 0 } sidecarBytes) { return AudioFormat.Lossless; } // Inject BEFORE InitializeStreaming (the set-before-init contract). A parse failure here means the // bytes are not a usable sidecar — fall back to lossless so a malformed sidecar never breaks playback. var injected = await _audioInterop.SetOpusSidecar(PlayerId, sidecarBytes); if (!injected.Success) { _logger.LogWarning("Opus sidecar for {EntryKey} failed to parse ({Error}); falling back to lossless.", entryKey, injected.Error); return AudioFormat.Lossless; } return AudioFormat.Opus; } /// /// Fetches and decodes the track's waveform loudness profile, then notifies state so the /// seek zone re-renders with real bars. Best-effort: a 404 (no stored profile) or any other /// failure simply leaves null, which the /// WaveformSeeker renders as a flat-but-seekable fallback. Never throws into the load path. /// private async Task LoadWaveformProfileAsync(string entryKey, CancellationToken cancellationToken) { WaveformProfile = null; try { var result = await _trackMediaClient.GetWaveformProfileAsync(entryKey, cancellationToken); if (cancellationToken.IsCancellationRequested) return; if (result.Success && result.Value is { } dto) { WaveformProfile = DecodeWaveformProfile(dto); await NotifyStateChanged(); } } catch (OperationCanceledException) { // Track switched or stopped before the profile arrived — nothing to surface. } catch (Exception ex) { // A failed profile fetch must not disturb playback; log and fall back to flat bars. _logger.LogDebug(ex, "Failed to load waveform profile for {EntryKey}", entryKey); } } /// /// Decodes a (base64 of byte[BucketCount], each 0..255) into /// a normalized double[] in [0, 1]. Returns null if the payload is malformed so callers treat /// it as "no profile" rather than rendering garbage bars. /// private static double[]? DecodeWaveformProfile(WaveformProfileDto dto) { if (string.IsNullOrEmpty(dto.Data)) return null; byte[] bytes; try { bytes = Convert.FromBase64String(dto.Data); } catch (FormatException) { return null; } if (bytes.Length == 0) return null; var profile = new double[bytes.Length]; for (var i = 0; i < bytes.Length; i++) { profile[i] = bytes[i] / 255.0; } return profile; } /// /// Phase 21 Direction B — the single segmented forward read loop, shared by the initial load and /// the seek/refill path (the convergence C1/C5 require: one cursor, one fetch mechanism, no forked /// path). It pumps the FIRST segment (already fetched by the caller), then fetches subsequent /// bounded bytes=cursor-(cursor+SegmentSizeBytes-1) 206 segments — each only AFTER the /// scheduler drains below low-water — until the cursor reaches . /// Because each segment is bounded and fully consumed before the next is requested, the browser /// holds at most ~one segment of raw bytes (the network-memory bound), while the decoder sees one /// continuous in-order byte stream across segment boundaries (the demuxer/decoder buffer partial /// frames/pages across the boundary exactly as for arbitrary chunks today — no per-segment reinit). /// /// The already-fetched first segment (byte ). /// Owned by this method, which disposes it; subsequent segments are fetched and disposed inline. /// File-absolute byte offset the first segment starts at (0 for a fresh load, /// the resolved seek offset for a refill). /// Total file length in bytes — the EOF boundary the cursor advances /// toward. The decoder is initialized/reinitialized against this, not the per-segment length. /// Non-null for a seek/refill: the decoder is reinitialized for the /// header-less Range continuation at this time before the first segment's bytes are fed (WAV /// retains its header, Opus re-applies the cached setup + lead-trim). Null for a forward load from /// byte 0, where the first segment carries the header and no reinit is needed. private async Task RunSegmentedStreamAsync( string trackId, TrackMediaResponse firstSegment, long cursor, long totalLength, double? seekPosition, CancellationToken cancellationToken) { byte[]? buffer = null; var segment = firstSegment; try { // Seek/refill: reinitialize the active decoder for the header-less continuation ONCE, // before any continuation bytes are fed. Forward-from-zero (seekPosition null) skips this // — its first segment carries the real header the decoder parses. Done here, inside the // single loop, so seek and forward share the same fetch+pump mechanism (no forked path). if (seekPosition is { } resumeAt) { // The decoder byte-counts the header-less continuation against the bytes REMAINING // from the range start to EOF (total − cursor), not the absolute total — that is what // reinitializeForRangeContinuation expects (StreamDecoder.remainingByteLength). The // loop's own cursor still targets the absolute totalLength for EOF. var remainingBytes = Math.Max(0, totalLength - cursor); var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, remainingBytes, resumeAt); if (!reinitResult.Success) { throw new Exception($"Failed to reinitialize for offset streaming: {reinitResult.Error}"); } } buffer = ArrayPool.Shared.Rent(MaxBufferSize); // larger rental to fit adaptive sizing var readTimer = System.Diagnostics.Stopwatch.StartNew(); // Segment loop. Each iteration fully consumes one bounded 206 body, advancing the cursor by // the bytes received. The next segment is fetched only when the scheduler is below // high-water (the inter-segment gate). EOF is the cursor reaching totalLength, or a short // segment (server returned fewer bytes than requested — the final slice). while (true) { long segmentBytesRead = 0; int currentBytes; do { readTimer.Restart(); currentBytes = await segment.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken); readTimer.Stop(); AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds); if (currentBytes > 0) { segmentBytesRead += currentBytes; // Slice to the exact bytes read: the pooled buffer is rented at MaxBufferSize // and may carry stale bytes past currentBytes from a prior rental — handing the // full array to JS would serialise that garbage into the audio stream. var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray(); var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer); if (!chunkResult.Success) { var error = $"Failed to process streaming chunk: {chunkResult.Error}"; _logger.LogWarning("Chunk processing failed: {Error}", error); throw new Exception(error); } CanStartStreaming = chunkResult.CanStartStreaming; HeaderParsed = chunkResult.HeaderParsed; BufferedChunks = chunkResult.BufferCount; // chunkResult.ProductionPaused is informational only on this path — back-pressure // granularity is one segment (the inter-segment gate below), not per-chunk. // Set duration from header when available (only set once) if (chunkResult.Duration.HasValue && Duration == null) { Duration = chunkResult.Duration.Value; _logger.LogInformation("Duration set from header: {Duration:F2} seconds", Duration); // Feed the once-only duration to the play session for the completion // fraction. No-op when no session is open; idempotent otherwise. _playTracker?.SetDuration(chunkResult.Duration.Value); } // Start playback as soon as we can — at the min-buffer threshold, exactly as // before (C2: first audio is not gated on the segment boundary; the first // segment alone clears the threshold). if (!_streamingPlaybackStarted && CanStartStreaming) { var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId); if (playbackResult.Success) { _streamingPlaybackStarted = true; IsPlaying = true; IsPaused = false; IsLoaded = true; // loaded and ready, even while still downloading ErrorMessage = null; // Open the play session exactly once per load, at the moment playback // truly begins (§2.1). The _sessionOpened guard keeps a seek/refill // re-stream — which re-enters this transition with // _streamingPlaybackStarted reset — from opening a second session for // the same play. Duration may already be known, so re-feed it. if (!_sessionOpened && _currentTrackId is { } trackKey) { _sessionOpened = true; _playTracker?.OnPlaybackStarted(trackKey); if (Duration is { } d) _playTracker?.SetDuration(d); } await NotifyStateChanged(); // immediate — critical state change } else { var technicalError = $"Failed to start streaming playback: {playbackResult.Error}"; _logger.LogError("Failed to start playback: {Error}", technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); } } // Progress against the total file length (cursor + bytes consumed so far). if (totalLength > 0) { LoadProgress = Math.Min(1.0, (double)(cursor + segmentBytesRead) / totalLength); } await ThrottledNotifyStateChanged(); } } while (currentBytes > 0); // Segment fully consumed; advance the cursor and release this segment's stream/socket // before deciding whether to fetch the next. Disposing here keeps exactly one segment's // raw bytes resident at a time. cursor += segmentBytesRead; segment.Dispose(); segment = null!; // EOF: cursor reached the total file length. This is the sole forward-EOF condition. // A short segment body (segmentBytesRead < SegmentSizeBytes) is NOT an EOF signal — // the inner read loop fully drains the HTTP body, so a short body means the server // sent fewer bytes than the bounded range we requested. While cursor < totalLength that // can only be a connection drop / truncated stream, NOT the file tail — route it to // the same clean-failure recovery as a fetch error rather than silently completing. var reachedTotal = totalLength > 0 && cursor >= totalLength; if (reachedTotal) { break; } // Guard: if the body was short but we haven't reached totalLength, the stream was // truncated mid-segment (connection drop / premature close). Surface as an error so // the scheduler is halted rather than left to drain its buffered tail into a false end. if (segmentBytesRead < SegmentSizeBytes) { throw new Exception( $"Stream truncated at byte {cursor} of {totalLength}: received {segmentBytesRead} bytes " + $"but expected up to {SegmentSizeBytes} and have not reached EOF"); } // Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH // instead of pacing ReadAsync on an open stream). Do not fetch the next segment while // the scheduler is over high-water; wait until it drains below low-water. Because the // browser only buffers bounded segments and we hold off requesting the next one, raw // network memory stays at ~one segment. The poll awaits on cancellationToken, so a // track switch/seek mid-wait throws OCE and unwinds through the existing drain // discipline (C6). UC5: a user pause freezes the playhead so the fill never drains — // hold here until playback resumes (IsPaused clears) OR the fill drains on its own. while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId)) { cancellationToken.ThrowIfCancellationRequested(); await Task.Delay(BackpressurePollMs, cancellationToken); } // Fetch the next bounded segment. The end offset is clamped implicitly by the server // (a request past EOF yields the available tail as a short slice, caught above). var nextEnd = cursor + SegmentSizeBytes - 1; var nextResult = await _trackMediaClient.GetTrackMedia( trackId, byteOffset: cursor, byteEnd: nextEnd, format: _currentFormat, cancellationToken: cancellationToken); if (!nextResult.Success || nextResult.Value == null) { var technicalError = nextResult.GetMessage() ?? "Failed to fetch next stream segment"; _logger.LogError("Failed to fetch segment at offset {Offset} for {TrackId}: {Error}", cursor, trackId, technicalError); throw new Exception(technicalError); } segment = nextResult.Value; } // Notify the JS decoder that the stream is finished. The decoder marks completion by byte // count against the total it was initialized with; this explicit signal flushes the // residual tail and covers the (rare) case where the total was unknown. await _audioInterop.MarkStreamCompleteAsync(PlayerId); LoadProgress = 1.0; await NotifyStateChanged(); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Cancellation is expected during track switch or seek — propagate cleanly. throw; } catch (Exception ex) { StreamingErrorHandler.LogError(_logger, ex, "RunSegmentedStreamAsync"); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); LoadProgress = 0; IsLoaded = false; IsStreamingMode = false; await NotifyStateChanged(); throw; } finally { // Release the last segment (if a fetch failed mid-loop it may still be held) and the buffer. segment?.Dispose(); if (buffer != null) { ArrayPool.Shared.Return(buffer); } } } /// /// In streaming mode, Stop fully resets to Idle state since audio data is consumed. /// This is equivalent to Unload for streaming playback. /// public override async Task Stop() { // In streaming mode, Stop = Unload (data is consumed, can't replay) await ResetToIdle(); } /// /// Fully resets the player to Idle state, ready for a new track. /// public override async Task Unload() { await ResetToIdle(); } /// /// Override Seek to handle seek-beyond-buffer for streaming mode. /// public override async Task Seek(double position) { if (!IsLoaded || !IsStreamingMode) return; try { var result = await _audioInterop.SeekAsync(PlayerId, position); if (result.Success) { if (result.SeekBeyondBuffer && result.ByteOffset >= 0) { // Need to load new stream from offset _logger.LogInformation("Seeking beyond buffer to {Position:F2}s, byte offset: {ByteOffset}", position, result.ByteOffset); await SeekBeyondBuffer(position, result.ByteOffset); } else { // Seek within buffer succeeded CurrentTime = position; ErrorMessage = null; await NotifyStateChanged(); } } else { ErrorMessage = $"Seek error: {result.Error}"; await NotifyStateChanged(); } } catch (Exception ex) { _logger.LogError(ex, "Error seeking to position {Position}", position); ErrorMessage = $"Error seeking: {ex.Message}"; await NotifyStateChanged(); } } /// /// Handle seeking beyond the currently buffered content by requesting a new stream from offset. /// private async Task SeekBeyondBuffer(double seekPosition, long byteOffset) { if (string.IsNullOrEmpty(_currentTrackId)) { ErrorMessage = "Cannot seek - no track loaded"; return; } // Capture into a non-null local: _currentTrackId is the field a track-switch could clear, but // this seek operates against the track loaded NOW; the segment loop needs a stable id. var trackId = _currentTrackId; IsSeekingBeyondBuffer = true; // Cancel the current streaming loop AND wait for it to fully exit before // starting a new one. The previous loop's pending ReadAsync will throw // OperationCanceledException asynchronously; if we kick off a new loop // immediately, both can race against the single-instance JS StreamDecoder // and corrupt decode state. Draining here is the load-bearing guarantee. // // Invariant: any caller that supersedes a load WITHOUT wanting the load's // state reset must assign its own CTS to _streamingCancellation *before* // its first await. LoadTrackStreaming's OCE continuation fires during the // drain await on the shared _activeStreamingTask; it resets IsLoaded/ // IsStreamingMode only when _streamingCancellation still equals its loadCts. // Assigning seekCts synchronously here makes that identity check fail, so // the seek's state survives. (ResetToIdle deliberately does NOT do this — // it wants the reset, and nulls _streamingCancellation only after the drain.) var oldCts = _streamingCancellation; var seekCts = new CancellationTokenSource(); _streamingCancellation = seekCts; oldCts?.Cancel(); await DrainActiveStreamingTaskAsync(); oldCts?.Dispose(); // Single-writer discipline (C6/AC8): all three failure exits must share the same guard. // TrackMediaClient.GetTrackMedia swallows OperationCanceledException and returns // Success==false, so a superseded seek lands in the media-fetch-fail branch below // rather than in the OCE catch. Without the guard those branches would call // RecoverFromFailedRefill — running clearForSeek + setPlaybackOffset against the player // state the NEWER seek now owns. A local predicate keeps all three exits symmetric so a // future exit cannot forget the check. bool IsStillActiveSeek() => ReferenceEquals(_streamingCancellation, seekCts); try { // Update UI immediately CurrentTime = seekPosition; await NotifyStateChanged(); // Request the FIRST bounded segment from the resolved offset (Direction B — converged with // the forward path). Reuse the format the initial load resolved to (_currentFormat): an // Opus seek must come back as Opus bytes so the cached setup header + page-aligned // byteOffset (resolved JS-side from the Opus seek index) match the continuation; WAV resolves // its offset from the header — one seam, format-appropriate math (AC9 / §3.4a C). The // segment loop then continues forward segmentation from this offset exactly as a fresh load // does from 0 — no forked fetch path (C1/C5). var firstSegment = await _trackMediaClient.GetTrackMedia( trackId, byteOffset, byteEnd: byteOffset + SegmentSizeBytes - 1, format: _currentFormat, cancellationToken: seekCts.Token); if (!firstSegment.Success || firstSegment.Value == null) { var technicalError = firstSegment.GetMessage() ?? "Failed to load audio from position"; _logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError); // Guard: a superseded seek must NOT touch shared state. The newer seek owns teardown. if (IsStillActiveSeek()) { await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(technicalError)); } else { _logger.LogDebug("Media-fetch failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition); } return; } var audio = firstSegment.Value; // The absolute EOF boundary the segment loop's cursor targets. On a 206 the Content-Range // carries the file total; on a 200 (single-segment file) fall back to cursor + body length. var totalLength = audio.TotalLength ?? (byteOffset + audio.ContentLength); // Reset streaming state for the new stream. The decoder reinit for the header-less // continuation happens INSIDE RunSegmentedStreamAsync (seekPosition non-null), so seek and // forward share one fetch+pump+reinit mechanism. A reinit failure there throws and lands in // the catch below, which recovers when still the active seek — the same clean-failure path // (AC6) the old explicit reinit branch had, now unified with the fetch-failure path. _streamingPlaybackStarted = false; CanStartStreaming = false; HeaderParsed = false; BufferedChunks = 0; // Stream from offset via the shared segment loop. Ownership of `audio` transfers to it. _activeStreamingTask = RunSegmentedStreamAsync( trackId, audio, cursor: byteOffset, totalLength, seekPosition, seekCts.Token); await _activeStreamingTask; IsSeekingBeyondBuffer = false; } catch (OperationCanceledException) when (seekCts.IsCancellationRequested) { // Another seek or stop interrupted this one. Only clear the flag if we are // still the active seek — if _streamingCancellation has been replaced, a // newer seek is in progress and owns the flag. _logger.LogDebug("Seek beyond buffer cancelled"); if (IsStillActiveSeek()) { IsSeekingBeyondBuffer = false; } } catch (Exception ex) { // A refill fetch can fail deep into a long mix (the listener didn't initiate it). Recover // into a clean paused-but-loaded state (AC6) rather than leaving the starved scheduler to // fire a silent false end. Only when we are still the active seek — a superseding seek owns // the state and the OCE catch above handles its own teardown. _logger.LogError(ex, "Error during seek beyond buffer to position {Position}", seekPosition); if (IsStillActiveSeek()) { await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(ex.Message)); } } } /// /// Clean-failure recovery for a window-miss refill (Phase 21.3 / AC6). A backward seek past the /// retained tail re-fetches via the existing Range path; that mid-stream fetch the listener did not /// initiate can fail deep into a long mix. When it does, the pre-seek loop has already been /// cancelled and drained, but the JS scheduler is still holding stale pre-seek buffers and still /// "playing" — left alone it drains them and fires a silent false end (the wedged/starved state AC6 /// forbids). This halts the scheduler into a paused-but-loaded state at , /// surfaces a clear error, and leaves the track loaded so the listener can retry the seek or pick /// another track. Mirrors PlaybackScheduler.playFromPosition's end-of-buffer recovery: stop /// pretending to play. /// private async Task RecoverFromFailedRefill(double seekPosition, string userFacingError) { // Halt the starved scheduler JS-side (stop sources, drop stale buffers, anchor at the target). // Best-effort: if even this interop fails the player is no worse off, and we still surface the // error and settle C# state below. var recovered = await _audioInterop.RecoverFromFailedRefill(PlayerId, seekPosition); if (!recovered.Success) { _logger.LogWarning("Refill-failure recovery interop did not succeed: {Error}", recovered.Error); } // Settle C# into the matching recoverable state: not playing, paused at the target, still loaded // and still in streaming mode. IsLoaded = true and IsStreamingMode = true are both load-bearing — // the "paused-but-loaded" contract lets the listener retry the seek (Seek early-returns when // !IsLoaded || !IsStreamingMode), resume via TogglePlayPause, or pick another track. Resetting // either to false would wedge at least one of the three retry routes (AC6 / Phase 21.3). ErrorMessage = userFacingError; IsPlaying = false; IsPaused = true; IsLoaded = true; IsStreamingMode = true; CurrentTime = seekPosition; IsSeekingBeyondBuffer = false; await NotifyStateChanged(); } /// /// Single method to reset all state - called by both Stop and Unload. /// private async Task ResetToIdle() { // 0. Close any open play session BEFORE tearing down (§2.1). ResetToIdle is the single funnel // for stop / unload / dispose / track-switch (a new LoadTrackStreaming calls it first), so a // superseded listen is recorded here with its high-water bucket. Close is idempotent — if the // session already closed organically or via the unload beacon, this is a no-op. _playTracker?.Close(); _sessionOpened = false; // 1. Cancel any ongoing streaming operation and wait for it to exit // before tearing down JS state. Otherwise the loop's pending // ProcessStreamingChunk call can land after StopAsync/UnloadAsync. _streamingCancellation?.Cancel(); await DrainActiveStreamingTaskAsync(); _streamingCancellation?.Dispose(); _streamingCancellation = null; // 2. Tell JS to stop and unload try { await _audioInterop.StopAsync(PlayerId); await _audioInterop.UnloadAsync(PlayerId); } catch { // Ignore JS errors during cleanup } // 3. Reset ALL state to Idle IsPlaying = false; IsPaused = false; IsLoaded = false; IsLoading = false; CurrentTime = 0; Duration = null; LoadProgress = 0; ErrorMessage = null; CurrentTrack = null; WaveformProfile = null; // 4. Reset streaming-specific state IsStreamingMode = false; CanStartStreaming = false; HeaderParsed = false; BufferedChunks = 0; _streamingPlaybackStarted = false; IsSeekingBeyondBuffer = false; _currentTrackId = null; _currentFormat = AudioFormat.Lossless; await NotifyStateChanged(); } /// /// Wait for the previously-started streaming loop to fully exit. The caller /// must have already cancelled . Swallows /// the expected OperationCanceledException; any other exception was already /// surfaced through the loop's own catch block, so we ignore it here too. /// private async Task DrainActiveStreamingTaskAsync() { var task = _activeStreamingTask; if (task == null) return; try { await task; } catch (OperationCanceledException) { // Expected when we cancelled the loop ourselves. } catch { // Any other failure was already logged inside the loop. } finally { // Only clear if we are still the active task — a concurrent caller // may have started a new stream while we were draining the old one. if (ReferenceEquals(_activeStreamingTask, task)) { _activeStreamingTask = null; } } } private async Task ThrottledNotifyStateChanged() { var now = DateTime.UtcNow; if ((now - _lastNotification).TotalMilliseconds >= NotificationThrottleMs) { _lastNotification = now; await NotifyStateChanged(); } } /// /// On component unmount we must cancel the in-flight streaming loop and tear /// down JS callbacks before the JS side's setInterval fires again with a /// stale DotNetObjectReference. ResetToIdle covers cancellation + JS stop /// + state reset; the base then disposes the JS player and its callbacks. /// public override async ValueTask DisposeAsync() { try { // ResetToIdle closes any open play session, so a dispose mid-play still records the listen. await ResetToIdle(); } catch { // Disposal must not throw; any failure here is best-effort cleanup. } // Detach the page-unload handler so the torn-down circuit is never invoked, then release the // self-reference. Best-effort — the JS side tolerates an absent key. if (_unloadKey is not null && _beacon is not null) { try { await _beacon.UnregisterUnloadAsync(_unloadKey); } catch { /* best-effort */ } } _unloadRef?.Dispose(); _unloadRef = null; await base.DisposeAsync(); } private void AdaptBufferSize(int bytesRead, long readTimeMs) { // Adaptive buffer sizing based on network performance if (readTimeMs > 100) // Slow read (>100ms) { _consecutiveSlowReads++; if (_consecutiveSlowReads >= 3 && _currentBufferSize > MinBufferSize) { // Reduce buffer size for slow connections _currentBufferSize = Math.Max(MinBufferSize, _currentBufferSize / 2); _consecutiveSlowReads = 0; } } else if (readTimeMs < 20 && bytesRead == _currentBufferSize) // Fast read, buffer fully utilized { _consecutiveSlowReads = 0; if (_currentBufferSize < MaxBufferSize) { // Increase buffer size for fast connections _currentBufferSize = Math.Min(MaxBufferSize, _currentBufferSize * 2); } } else { _consecutiveSlowReads = 0; } } }