Files
deepdrft/DeepDrftPublic.Client/Services/StreamingAudioPlayerService.cs
T
daniel-c-harvey 369cb86437 Add [BP-DIAG] back-pressure instrumentation for Phase 21.4 browser run
Temporary, grep-tagged diagnostics at the read-loop pause, the scheduler
latch, and the chunk-result path to show whether ProductionPaused latches,
reaches C#, and parks the loop. Strip once the cause is confirmed.
2026-06-24 09:00:38 -04:00

965 lines
46 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using DeepDrftModels.DTOs;
using DeepDrftModels.Enums;
using DeepDrftPublic.Client.Clients;
using System.Buffers;
using Microsoft.Extensions.Logging;
using Microsoft.JSInterop;
namespace DeepDrftPublic.Client.Services;
public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerService
{
// Configuration constants
private const int DefaultBufferSize = 32 * 1024; // 32KB chunks
private const int NotificationThrottleMs = 100; // Throttle UI updates to max 10 per second
// Adaptive chunk sizing
private const int MinBufferSize = 16 * 1024; // 16KB minimum
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
// Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water
// mark, the read loop stops calling ReadAsync and polls IsProductionPaused at this cadence
// until the fill drains below low-water. 100 ms is well under the low-water lookahead (seconds),
// so resume is prompt relative to the playhead — no starvation (AC3) — while keeping the poll
// cheap. The poll honors the loop's cancellation token, so a track switch/seek during a pause
// exits through the same drain discipline as a pause during ReadAsync (C6).
private const int BackpressurePollMs = 100;
private int _currentBufferSize = DefaultBufferSize;
private int _consecutiveSlowReads = 0;
// Streaming state properties
public bool IsStreamingMode { get; private set; } = false;
public bool CanStartStreaming { get; private set; } = false;
public bool HeaderParsed { get; private set; } = false;
public int BufferedChunks { get; private set; } = 0;
public bool IsSeekingBeyondBuffer { get; private set; } = false;
// ───────────────────────────────────────────────────────────────────────────────────────────
// [BP-DIAG] Phase 21.4 back-pressure diagnostic. TEMPORARY — strip once the cause is confirmed
// in Daniel's browser run. Logs every Nth chunk's ProductionPaused flag plus pause-poll
// enter/exit so a grep for "[BP-DIAG]" in the WASM console tells whether the read loop ever sees
// the pause signal and whether the poll actually holds. Throttled by chunk count to avoid flooding.
private const int BpDiagChunkLogEvery = 16;
// ───────────────────────────────────────────────────────────────────────────────────────────
private bool _streamingPlaybackStarted = false;
private CancellationTokenSource? _streamingCancellation;
private Task? _activeStreamingTask;
private DateTime _lastNotification = DateTime.MinValue;
private readonly ILogger<StreamingAudioPlayerService> _logger;
private string? _currentTrackId;
// The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and
// reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the
// initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup
// header, and the byte offsets all belong to one artifact). Defaults to Lossless until a load resolves.
private AudioFormat _currentFormat = AudioFormat.Lossless;
// Phase 16 play-session telemetry (§2.1). The tracker observes the playback lifecycle and emits at
// most one bucketed play event per session, behind the engagement floor. Attached after construction
// by AudioPlayerProvider (the player is not DI-registered), mirroring how QueueService binds — no
// constructor growth propagated through DI, no construction cycle. Null when telemetry is not wired
// (e.g. unit tests that construct the player without it), so every call is null-guarded.
private PlayTracker? _playTracker;
private BeaconInterop? _beacon;
private DotNetObjectReference<StreamingAudioPlayerService>? _unloadRef;
private string? _unloadKey;
// One-shot guard so the play session opens exactly once per LoadTrackStreaming — never on the
// SeekBeyondBuffer re-stream, which reuses _currentTrackId and re-runs the playback-start transition
// with _streamingPlaybackStarted reset. A seek-beyond-buffer is the SAME play (§1d), so it must not
// open a new session. Set true when the session opens; reset only by LoadTrackStreaming.
private bool _sessionOpened;
public StreamingAudioPlayerService(
AudioInteropService audioInterop,
TrackMediaClient trackMediaClient,
ILogger<StreamingAudioPlayerService> logger)
: base(audioInterop, trackMediaClient)
{
_logger = logger;
}
/// <summary>
/// Wire the play-session tracker and beacon transport into the player after construction (Phase 16
/// §2.1). Called once by <c>AudioPlayerProvider</c>. Kept off the constructor deliberately: the player
/// is built with <c>new</c> by the provider (not DI), so threading the tracker through the constructor
/// would force the provider to resolve it too — instead the provider injects the tracker's collaborators
/// and hands a built tracker here, the same post-construction binding QueueService uses. Also registers
/// the page-unload handler so a mid-play tab-close still records the play via sendBeacon.
/// </summary>
public void AttachTracker(PlayTracker tracker, BeaconInterop beacon)
{
_playTracker = tracker;
_beacon = beacon;
_unloadRef = DotNetObjectReference.Create(this);
_unloadKey = PlayerId;
// Fire-and-forget: registration only needs to have happened before the listener leaves; it
// never gates playback. A failure simply means tab-close mid-play isn't recorded.
_ = _beacon.RegisterUnloadAsync(_unloadKey, _unloadRef, nameof(OnPageUnload));
}
/// <summary>
/// Close the open play session as the page unloads (pagehide / visibility→hidden). Invoked
/// synchronously from the beacon's unload handler so the session's beacon is queued before the page
/// freezes. <see cref="PlayTracker.Close"/> is idempotent, so a later organic close is a no-op.
/// </summary>
[JSInvokable]
public void OnPageUnload() => _playTracker?.Close();
// Advance the play-session high-water mark on each progress tick (§2.1). Seeking backward never
// lowers it — the tracker takes the max.
protected override void OnProgressTick(double currentTime) => _playTracker?.OnProgress(currentTime);
// Organic end-of-stream closes the session; the bucket reflects the high-water fraction reached.
protected override void OnPlaybackEnded() => _playTracker?.Close();
public override async Task SelectTrack(TrackDto track)
{
await SelectTrackStreaming(track);
}
/// <inheritdoc />
public async Task WarmAudioContext()
{
await EnsureInitializedAsync();
await _audioInterop.EnsureAudioContextReady(PlayerId);
}
public async Task SelectTrackStreaming(TrackDto track)
{
await EnsureInitializedAsync();
// Resume AudioContext immediately on track selection (user interaction) to avoid clicks later
await _audioInterop.EnsureAudioContextReady(PlayerId);
await NotifyTrackSelected();
await LoadTrackStreaming(track);
await NotifyStateChanged();
}
/// <inheritdoc />
public async Task StageTrack(TrackDto track)
{
// Pure state: expose the track as current so the bar shows it ready, but do NOT
// initialize the player, resume the AudioContext, or start streaming. Those steps
// require a user gesture and run on the first play click via SelectTrackStreaming.
CurrentTrack = track;
ErrorMessage = null;
await NotifyStateChanged();
}
private async Task LoadTrackStreaming(TrackDto track)
{
// Always reset to clean state before loading new track. ResetToIdle
// both cancels and awaits any in-flight streaming loop, so by the time
// we return from it the previous loop is guaranteed to have exited and
// there is no risk of interleaved ProcessStreamingChunk calls against
// the single-instance JS StreamDecoder.
await ResetToIdle();
// Save track ID for seek operations
_currentTrackId = track.EntryKey;
// A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the
// one-shot session-open guard; the session actually opens at the playback-start transition
// below (a track that fails to load never reaches it, so it does not count).
_sessionOpened = false;
// Expose to UI immediately — Now-Playing surfaces should reflect the selected
// track while it's still loading, not only after playback starts.
CurrentTrack = track;
// Create new cancellation token for this streaming operation. Capture it in a local
// so the catch/finally can compare identity against _streamingCancellation: a seek
// replaces _streamingCancellation with its own seekCts before this load's continuation
// resumes on the single-threaded WASM dispatcher, and we must not clobber the seek's state.
var loadCts = new CancellationTokenSource();
_streamingCancellation = loadCts;
// Fetch the waveform profile alongside the audio. Fire-and-forget against the same
// streaming token so a track switch abandons it; it only updates display state and must
// never gate or fail the audio load (a missing profile yields the flat-seekbar fallback).
_ = LoadWaveformProfileAsync(track.EntryKey, loadCts.Token);
try
{
// Set state to indicate loading has started
ErrorMessage = null;
LoadProgress = 0;
IsLoading = true;
IsStreamingMode = true;
// Reset adaptive buffer sizing
_currentBufferSize = DefaultBufferSize;
_consecutiveSlowReads = 0;
await NotifyStateChanged();
// Resolve the delivery format for this load BEFORE requesting bytes (Phase 18, default policy
// OQ2). When Opus is chosen the sidecar is fetched and injected into the JS player here, ahead of
// InitializeStreaming, honouring the 18.4 set-before-init contract. The result is captured so the
// seek-beyond-buffer re-fetch reuses the same artifact.
_currentFormat = await ResolveStreamFormatAsync(track.EntryKey, loadCts.Token);
// Pass the streaming token to the HTTP layer so a navigation/track switch
// aborts the server connection instead of leaving it draining bytes.
var mediaResult = await _trackMediaClient.GetTrackMedia(
track.EntryKey,
byteOffset: 0,
format: _currentFormat,
cancellationToken: loadCts.Token);
if (!mediaResult.Success)
{
var technicalError = mediaResult.GetMessage();
_logger.LogError("Failed to get track media for {TrackId}: {Error}",
track.EntryKey, technicalError);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
return;
}
if (mediaResult.Value == null)
{
const string technicalError = "No audio returned from server";
_logger.LogError("No audio data returned for track {TrackId}", track.EntryKey);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
return;
}
using var audio = mediaResult.Value;
// Initialize streaming mode with content length and media type (drives
// JS format-decoder selection).
var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, audio.ContentLength, audio.ContentType);
if (!streamingResult.Success)
{
var technicalError = $"Failed to initialize streaming: {streamingResult.Error}";
_logger.LogError("Streaming initialization failed for track {TrackId}: {Error}",
track.EntryKey, technicalError);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
return;
}
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, loadCts.Token);
await _activeStreamingTask;
}
catch (OperationCanceledException) when (loadCts.IsCancellationRequested)
{
// Cancellation is expected when this load was superseded (track switch or seek).
// The when filter ensures HttpClient timeout OCEs — where loadCts was NOT
// cancelled — fall through to the error handler below instead of being swallowed.
_logger.LogDebug("Audio streaming cancelled for track {TrackId}", track.EntryKey);
// Only reset streaming state if this load is still the active operation. A seek
// in flight has already replaced _streamingCancellation with its own seekCts and
// owns IsLoaded/IsStreamingMode; clobbering them here corrupts the seek mid-flight.
if (ReferenceEquals(_streamingCancellation, loadCts))
{
IsLoaded = false;
IsStreamingMode = false;
}
}
catch (Exception ex)
{
StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
LoadProgress = 0;
IsLoaded = false;
IsStreamingMode = false;
}
finally
{
IsLoading = false;
// Only notify if this load is still the active operation. A superseding seek
// owns state notifications; firing here mid-seek would push a stale snapshot.
if (ReferenceEquals(_streamingCancellation, loadCts))
{
await NotifyStateChanged();
}
}
}
/// <summary>
/// Resolves which delivery format this load should request (Phase 18 default policy, OQ2): Opus when the
/// browser can decode Ogg Opus AND a sidecar exists for the track, otherwise lossless. When Opus is
/// chosen the sidecar is injected into the JS player here (set-before-init, the 18.4 contract) so the
/// decoder has its setup header + seek index before <c>InitializeStreaming</c> builds it.
/// <para>
/// This is the single, deliberately-overridable seam for the listener quality preference (wave 18.6).
/// 18.6 overrides this to honour the user's "streaming quality" toggle — returning lossless when the
/// listener picked it, and otherwise falling through to this capability-gated default. The capability
/// gate (AC7) and the sidecar-absent → lossless fallback (C2) stay here so any override inherits both:
/// a browser that cannot decode Opus, or a track with no sidecar, always lands on lossless and plays.
/// </para>
/// </summary>
protected virtual async Task<AudioFormat> ResolveStreamFormatAsync(string entryKey, CancellationToken cancellationToken)
{
// Capability gate first (AC7): never hand Ogg Opus to a browser that cannot decode it.
if (!await _audioInterop.CanDecodeOggOpus())
{
return AudioFormat.Lossless;
}
// The sidecar must be present (and parseable by the JS decoder) to seek an Opus stream. Its absence
// means the track has no Opus artifact yet (legacy / not backfilled / transcode failed) — request
// lossless rather than Opus-without-a-sidecar (the server would C2-fall-back anyway, but asking for
// lossless keeps the request honest and avoids a wasted Opus-then-fallback round-trip).
var sidecar = await _trackMediaClient.GetOpusSidecarAsync(entryKey, cancellationToken);
if (!sidecar.Success || sidecar.Value is not { Length: > 0 } sidecarBytes)
{
return AudioFormat.Lossless;
}
// Inject BEFORE InitializeStreaming (the set-before-init contract). A parse failure here means the
// bytes are not a usable sidecar — fall back to lossless so a malformed sidecar never breaks playback.
var injected = await _audioInterop.SetOpusSidecar(PlayerId, sidecarBytes);
if (!injected.Success)
{
_logger.LogWarning("Opus sidecar for {EntryKey} failed to parse ({Error}); falling back to lossless.",
entryKey, injected.Error);
return AudioFormat.Lossless;
}
return AudioFormat.Opus;
}
/// <summary>
/// Fetches and decodes the track's waveform loudness profile, then notifies state so the
/// seek zone re-renders with real bars. Best-effort: a 404 (no stored profile) or any other
/// failure simply leaves <see cref="AudioPlayerService.WaveformProfile"/> null, which the
/// WaveformSeeker renders as a flat-but-seekable fallback. Never throws into the load path.
/// </summary>
private async Task LoadWaveformProfileAsync(string entryKey, CancellationToken cancellationToken)
{
WaveformProfile = null;
try
{
var result = await _trackMediaClient.GetWaveformProfileAsync(entryKey, cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
if (result.Success && result.Value is { } dto)
{
WaveformProfile = DecodeWaveformProfile(dto);
await NotifyStateChanged();
}
}
catch (OperationCanceledException)
{
// Track switched or stopped before the profile arrived — nothing to surface.
}
catch (Exception ex)
{
// A failed profile fetch must not disturb playback; log and fall back to flat bars.
_logger.LogDebug(ex, "Failed to load waveform profile for {EntryKey}", entryKey);
}
}
/// <summary>
/// Decodes a <see cref="WaveformProfileDto"/> (base64 of byte[BucketCount], each 0..255) into
/// a normalized double[] in [0, 1]. Returns null if the payload is malformed so callers treat
/// it as "no profile" rather than rendering garbage bars.
/// </summary>
private static double[]? DecodeWaveformProfile(WaveformProfileDto dto)
{
if (string.IsNullOrEmpty(dto.Data)) return null;
byte[] bytes;
try
{
bytes = Convert.FromBase64String(dto.Data);
}
catch (FormatException)
{
return null;
}
if (bytes.Length == 0) return null;
var profile = new double[bytes.Length];
for (var i = 0; i < bytes.Length; i++)
{
profile[i] = bytes[i] / 255.0;
}
return profile;
}
private async Task StreamAudioWithEarlyPlayback(TrackMediaResponse audio, CancellationToken cancellationToken)
{
byte[]? buffer = null;
try
{
long totalBytesRead = 0;
buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize); // Rent larger buffer to accommodate adaptive sizing
int currentBytes;
var readTimer = System.Diagnostics.Stopwatch.StartNew();
var bpDiagChunkIndex = 0; // [BP-DIAG] per-stream chunk counter for throttled logging
do
{
readTimer.Restart();
currentBytes = await audio.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
readTimer.Stop();
// Adapt buffer size based on read performance
AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds);
if (currentBytes > 0)
{
totalBytesRead += currentBytes;
// Always slice to the exact number of bytes read. The pooled buffer
// is rented at MaxBufferSize and may carry stale bytes past
// currentBytes from a prior rental — handing the full array to JS
// interop would serialise that garbage into the audio stream.
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
// Process chunk for streaming
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
if (!chunkResult.Success)
{
var error = $"Failed to process streaming chunk: {chunkResult.Error}";
_logger.LogWarning("Chunk processing failed: {Error}", error);
throw new Exception(error);
}
// Update streaming state
CanStartStreaming = chunkResult.CanStartStreaming;
HeaderParsed = chunkResult.HeaderParsed;
BufferedChunks = chunkResult.BufferCount;
// [BP-DIAG] Phase 21.4 — throttled per-chunk view of the back-pressure signal as
// the C# loop sees it. If ProductionPaused never logs True while bytes keep
// flowing, the break is upstream (JS latch / lookahead math); if it logs True but
// the transfer still races to 100%, the break is the transport (browser buffered
// the whole body, SetBrowserResponseStreamingEnabled not in effect). TEMPORARY.
if (bpDiagChunkIndex % BpDiagChunkLogEvery == 0)
{
_logger.LogInformation(
"[BP-DIAG] chunk #{Chunk} bytesRead={Bytes} totalRead={Total} bufferCount={BufCount} canStart={CanStart} productionPaused={Paused} isPaused={IsPaused}",
bpDiagChunkIndex, currentBytes, totalBytesRead, chunkResult.BufferCount,
chunkResult.CanStartStreaming, chunkResult.ProductionPaused, IsPaused);
}
bpDiagChunkIndex++;
// Set duration from WAV header when available (only set once)
if (chunkResult.Duration.HasValue && Duration == null)
{
Duration = chunkResult.Duration.Value;
_logger.LogInformation("Duration set from WAV header: {Duration:F2} seconds", Duration);
// Feed the same once-only duration to the play session so it can compute the
// completion fraction at close. Safe before/after session open — SetDuration
// is a no-op when no session is open and idempotent otherwise.
_playTracker?.SetDuration(chunkResult.Duration.Value);
}
// Start playback as soon as we can
if (!_streamingPlaybackStarted && CanStartStreaming)
{
var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId);
if (playbackResult.Success)
{
_streamingPlaybackStarted = true;
IsPlaying = true;
IsPaused = false;
IsLoaded = true; // Track is loaded and ready to play (even if still downloading)
ErrorMessage = null;
// Open the play session exactly once per load, at the moment playback truly
// begins (§2.1). The _sessionOpened guard keeps the SeekBeyondBuffer re-stream
// — which re-enters this transition with _streamingPlaybackStarted reset —
// from opening a second session for the same play. Duration may already be
// known from a prior chunk, so re-feed it after opening.
if (!_sessionOpened && _currentTrackId is { } trackKey)
{
_sessionOpened = true;
_playTracker?.OnPlaybackStarted(trackKey);
if (Duration is { } d)
_playTracker?.SetDuration(d);
}
await NotifyStateChanged(); // Immediate notification for critical state change
}
else
{
var technicalError = $"Failed to start streaming playback: {playbackResult.Error}";
_logger.LogError("Failed to start playback: {Error}", technicalError);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
}
}
// Update progress
if (audio.ContentLength > 0)
{
LoadProgress = Math.Min(1.0, (double)totalBytesRead / audio.ContentLength);
}
await ThrottledNotifyStateChanged();
// Phase 21.2a back-pressure (serves BOTH paths). The chunk we just processed
// reported the scheduler's forward fill is over the high-water mark — stop
// reading the socket so the unplayed decoded region stays bounded. Pausing
// ReadAsync lets the kernel TCP window close (we are working WITH transport flow
// control, not against it). Poll until the fill drains below low-water, then
// resume the loop. For WAV this is the whole story (StreamDecoder decodes
// synchronously into the scheduler); the Opus feed additionally self-throttles
// its demux/decode off the SAME signal (21.2b), so its upstream queues stay
// near-empty too. The poll awaits on cancellationToken, so a track switch/seek
// mid-pause throws OCE and unwinds through the existing drain discipline (C6) —
// no separate cancellation path, no stale read racing a reinit.
if (chunkResult.ProductionPaused)
{
// [BP-DIAG] Phase 21.4 — the read loop is ENTERING the pause-poll: reads have
// stopped, the socket should now stall and the transfer should plateau. If you
// see this line but the network transfer still completes, the transport is
// buffered (streaming flag not in effect). TEMPORARY.
_logger.LogInformation(
"[BP-DIAG] ENTER pause-poll at chunk #{Chunk} totalRead={Total} isPaused={IsPaused}",
bpDiagChunkIndex, totalBytesRead, IsPaused);
var bpDiagPollCount = 0;
// UC5: while the user is paused, the playhead is frozen so forward lookahead
// never shrinks and the poll would spin indefinitely. Wait here until playback
// resumes (IsPaused clears) OR the fill drains on its own. Cancellation is
// unchanged: a track switch/seek/stop while paused still throws OCE and unwinds
// through the existing drain discipline (C6) — no weakening of the cancel path.
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
{
cancellationToken.ThrowIfCancellationRequested();
// [BP-DIAG] Phase 21.4 — heartbeat every ~1 s (10 × 100 ms) so a held poll
// is visible without flooding; shows the loop is genuinely parked. TEMPORARY.
if (bpDiagPollCount % 10 == 0)
{
_logger.LogInformation(
"[BP-DIAG] HOLD pause-poll iter={Iter} isPaused={IsPaused}",
bpDiagPollCount, IsPaused);
}
bpDiagPollCount++;
await Task.Delay(BackpressurePollMs, cancellationToken);
}
// [BP-DIAG] Phase 21.4 — the read loop is EXITING the pause-poll and resuming
// ReadAsync: the fill drained below low-water. TEMPORARY.
_logger.LogInformation(
"[BP-DIAG] EXIT pause-poll at chunk #{Chunk} after {Iters} polls",
bpDiagChunkIndex, bpDiagPollCount);
}
}
} while (currentBytes > 0);
// Notify the JS decoder that the stream is finished. When the server omits
// Content-Length the StreamDecoder cannot determine completion via byte counting
// alone; this explicit signal ensures the tail-decoding path (streamComplete=true)
// fires regardless of whether Content-Length was present.
await _audioInterop.MarkStreamCompleteAsync(PlayerId);
// Mark as fully loaded
LoadProgress = 1.0;
await NotifyStateChanged();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Cancellation is expected during track switch or seek — propagate cleanly.
throw;
}
catch (Exception ex)
{
StreamingErrorHandler.LogError(_logger, ex, "StreamAudioWithEarlyPlayback");
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
LoadProgress = 0;
IsLoaded = false;
IsStreamingMode = false;
await NotifyStateChanged();
throw;
}
finally
{
if (buffer != null)
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
/// <summary>
/// In streaming mode, Stop fully resets to Idle state since audio data is consumed.
/// This is equivalent to Unload for streaming playback.
/// </summary>
public override async Task Stop()
{
// In streaming mode, Stop = Unload (data is consumed, can't replay)
await ResetToIdle();
}
/// <summary>
/// Fully resets the player to Idle state, ready for a new track.
/// </summary>
public override async Task Unload()
{
await ResetToIdle();
}
/// <summary>
/// Override Seek to handle seek-beyond-buffer for streaming mode.
/// </summary>
public override async Task Seek(double position)
{
if (!IsLoaded || !IsStreamingMode) return;
try
{
var result = await _audioInterop.SeekAsync(PlayerId, position);
if (result.Success)
{
if (result.SeekBeyondBuffer && result.ByteOffset >= 0)
{
// Need to load new stream from offset
_logger.LogInformation("Seeking beyond buffer to {Position:F2}s, byte offset: {ByteOffset}",
position, result.ByteOffset);
await SeekBeyondBuffer(position, result.ByteOffset);
}
else
{
// Seek within buffer succeeded
CurrentTime = position;
ErrorMessage = null;
await NotifyStateChanged();
}
}
else
{
ErrorMessage = $"Seek error: {result.Error}";
await NotifyStateChanged();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error seeking to position {Position}", position);
ErrorMessage = $"Error seeking: {ex.Message}";
await NotifyStateChanged();
}
}
/// <summary>
/// Handle seeking beyond the currently buffered content by requesting a new stream from offset.
/// </summary>
private async Task SeekBeyondBuffer(double seekPosition, long byteOffset)
{
if (string.IsNullOrEmpty(_currentTrackId))
{
ErrorMessage = "Cannot seek - no track loaded";
return;
}
IsSeekingBeyondBuffer = true;
// Cancel the current streaming loop AND wait for it to fully exit before
// starting a new one. The previous loop's pending ReadAsync will throw
// OperationCanceledException asynchronously; if we kick off a new loop
// immediately, both can race against the single-instance JS StreamDecoder
// and corrupt decode state. Draining here is the load-bearing guarantee.
//
// Invariant: any caller that supersedes a load WITHOUT wanting the load's
// state reset must assign its own CTS to _streamingCancellation *before*
// its first await. LoadTrackStreaming's OCE continuation fires during the
// drain await on the shared _activeStreamingTask; it resets IsLoaded/
// IsStreamingMode only when _streamingCancellation still equals its loadCts.
// Assigning seekCts synchronously here makes that identity check fail, so
// the seek's state survives. (ResetToIdle deliberately does NOT do this —
// it wants the reset, and nulls _streamingCancellation only after the drain.)
var oldCts = _streamingCancellation;
var seekCts = new CancellationTokenSource();
_streamingCancellation = seekCts;
oldCts?.Cancel();
await DrainActiveStreamingTaskAsync();
oldCts?.Dispose();
// Single-writer discipline (C6/AC8): all three failure exits must share the same guard.
// TrackMediaClient.GetTrackMedia swallows OperationCanceledException and returns
// Success==false, so a superseded seek lands in the media-fetch-fail branch below
// rather than in the OCE catch. Without the guard those branches would call
// RecoverFromFailedRefill — running clearForSeek + setPlaybackOffset against the player
// state the NEWER seek now owns. A local predicate keeps all three exits symmetric so a
// future exit cannot forget the check.
bool IsStillActiveSeek() => ReferenceEquals(_streamingCancellation, seekCts);
try
{
// Update UI immediately
CurrentTime = seekPosition;
await NotifyStateChanged();
// Request new stream from offset. Reuse the format the initial load resolved to (_currentFormat):
// an Opus seek must come back as Opus bytes so the cached setup header + page-aligned byteOffset
// (resolved by the JS decoder's index-based calculateByteOffset) match the continuation. The
// offset itself is computed JS-side from the Opus seek index for Opus, exactly as it is from the
// WAV header for lossless — one seam, format-appropriate math (AC9 / §3.4a C).
var mediaResult = await _trackMediaClient.GetTrackMedia(
_currentTrackId,
byteOffset,
format: _currentFormat,
cancellationToken: seekCts.Token);
if (!mediaResult.Success || mediaResult.Value == null)
{
var technicalError = mediaResult.GetMessage() ?? "Failed to load audio from position";
_logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError);
// Guard: a superseded seek must NOT touch shared state. The newer seek owns teardown.
if (IsStillActiveSeek())
{
await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(technicalError));
}
else
{
_logger.LogDebug("Media-fetch failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition);
}
return;
}
using var audio = mediaResult.Value;
// Reinitialize JS player for offset streaming
var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, audio.ContentLength, seekPosition);
if (!reinitResult.Success)
{
_logger.LogError("Failed to reinitialize for offset streaming: {Error}", reinitResult.Error);
// Guard: same single-writer discipline — only recover when we are still the active seek.
if (IsStillActiveSeek())
{
await RecoverFromFailedRefill(seekPosition, "Failed to seek to position");
}
else
{
_logger.LogDebug("Reinit failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition);
}
return;
}
// Reset streaming state for new stream
_streamingPlaybackStarted = false;
CanStartStreaming = false;
HeaderParsed = false;
BufferedChunks = 0;
// Stream audio from offset
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, seekCts.Token);
await _activeStreamingTask;
IsSeekingBeyondBuffer = false;
}
catch (OperationCanceledException) when (seekCts.IsCancellationRequested)
{
// Another seek or stop interrupted this one. Only clear the flag if we are
// still the active seek — if _streamingCancellation has been replaced, a
// newer seek is in progress and owns the flag.
_logger.LogDebug("Seek beyond buffer cancelled");
if (IsStillActiveSeek())
{
IsSeekingBeyondBuffer = false;
}
}
catch (Exception ex)
{
// A refill fetch can fail deep into a long mix (the listener didn't initiate it). Recover
// into a clean paused-but-loaded state (AC6) rather than leaving the starved scheduler to
// fire a silent false end. Only when we are still the active seek — a superseding seek owns
// the state and the OCE catch above handles its own teardown.
_logger.LogError(ex, "Error during seek beyond buffer to position {Position}", seekPosition);
if (IsStillActiveSeek())
{
await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(ex.Message));
}
}
}
/// <summary>
/// Clean-failure recovery for a window-miss refill (Phase 21.3 / AC6). A backward seek past the
/// retained tail re-fetches via the existing Range path; that mid-stream fetch the listener did not
/// initiate can fail deep into a long mix. When it does, the pre-seek loop has already been
/// cancelled and drained, but the JS scheduler is still holding stale pre-seek buffers and still
/// "playing" — left alone it drains them and fires a silent false end (the wedged/starved state AC6
/// forbids). This halts the scheduler into a paused-but-loaded state at <paramref name="seekPosition"/>,
/// surfaces a clear error, and leaves the track loaded so the listener can retry the seek or pick
/// another track. Mirrors <c>PlaybackScheduler.playFromPosition</c>'s end-of-buffer recovery: stop
/// pretending to play.
/// </summary>
private async Task RecoverFromFailedRefill(double seekPosition, string userFacingError)
{
// Halt the starved scheduler JS-side (stop sources, drop stale buffers, anchor at the target).
// Best-effort: if even this interop fails the player is no worse off, and we still surface the
// error and settle C# state below.
var recovered = await _audioInterop.RecoverFromFailedRefill(PlayerId, seekPosition);
if (!recovered.Success)
{
_logger.LogWarning("Refill-failure recovery interop did not succeed: {Error}", recovered.Error);
}
// Settle C# into the matching recoverable state: not playing, paused at the target, still loaded.
ErrorMessage = userFacingError;
IsPlaying = false;
IsPaused = true;
CurrentTime = seekPosition;
IsSeekingBeyondBuffer = false;
await NotifyStateChanged();
}
/// <summary>
/// Single method to reset all state - called by both Stop and Unload.
/// </summary>
private async Task ResetToIdle()
{
// 0. Close any open play session BEFORE tearing down (§2.1). ResetToIdle is the single funnel
// for stop / unload / dispose / track-switch (a new LoadTrackStreaming calls it first), so a
// superseded listen is recorded here with its high-water bucket. Close is idempotent — if the
// session already closed organically or via the unload beacon, this is a no-op.
_playTracker?.Close();
_sessionOpened = false;
// 1. Cancel any ongoing streaming operation and wait for it to exit
// before tearing down JS state. Otherwise the loop's pending
// ProcessStreamingChunk call can land after StopAsync/UnloadAsync.
_streamingCancellation?.Cancel();
await DrainActiveStreamingTaskAsync();
_streamingCancellation?.Dispose();
_streamingCancellation = null;
// 2. Tell JS to stop and unload
try
{
await _audioInterop.StopAsync(PlayerId);
await _audioInterop.UnloadAsync(PlayerId);
}
catch
{
// Ignore JS errors during cleanup
}
// 3. Reset ALL state to Idle
IsPlaying = false;
IsPaused = false;
IsLoaded = false;
IsLoading = false;
CurrentTime = 0;
Duration = null;
LoadProgress = 0;
ErrorMessage = null;
CurrentTrack = null;
WaveformProfile = null;
// 4. Reset streaming-specific state
IsStreamingMode = false;
CanStartStreaming = false;
HeaderParsed = false;
BufferedChunks = 0;
_streamingPlaybackStarted = false;
IsSeekingBeyondBuffer = false;
_currentTrackId = null;
_currentFormat = AudioFormat.Lossless;
await NotifyStateChanged();
}
/// <summary>
/// Wait for the previously-started streaming loop to fully exit. The caller
/// must have already cancelled <see cref="_streamingCancellation"/>. Swallows
/// the expected OperationCanceledException; any other exception was already
/// surfaced through the loop's own catch block, so we ignore it here too.
/// </summary>
private async Task DrainActiveStreamingTaskAsync()
{
var task = _activeStreamingTask;
if (task == null) return;
try
{
await task;
}
catch (OperationCanceledException)
{
// Expected when we cancelled the loop ourselves.
}
catch
{
// Any other failure was already logged inside the loop.
}
finally
{
// Only clear if we are still the active task — a concurrent caller
// may have started a new stream while we were draining the old one.
if (ReferenceEquals(_activeStreamingTask, task))
{
_activeStreamingTask = null;
}
}
}
private async Task ThrottledNotifyStateChanged()
{
var now = DateTime.UtcNow;
if ((now - _lastNotification).TotalMilliseconds >= NotificationThrottleMs)
{
_lastNotification = now;
await NotifyStateChanged();
}
}
/// <summary>
/// On component unmount we must cancel the in-flight streaming loop and tear
/// down JS callbacks before the JS side's setInterval fires again with a
/// stale DotNetObjectReference. ResetToIdle covers cancellation + JS stop
/// + state reset; the base then disposes the JS player and its callbacks.
/// </summary>
public override async ValueTask DisposeAsync()
{
try
{
// ResetToIdle closes any open play session, so a dispose mid-play still records the listen.
await ResetToIdle();
}
catch
{
// Disposal must not throw; any failure here is best-effort cleanup.
}
// Detach the page-unload handler so the torn-down circuit is never invoked, then release the
// self-reference. Best-effort — the JS side tolerates an absent key.
if (_unloadKey is not null && _beacon is not null)
{
try { await _beacon.UnregisterUnloadAsync(_unloadKey); }
catch { /* best-effort */ }
}
_unloadRef?.Dispose();
_unloadRef = null;
await base.DisposeAsync();
}
private void AdaptBufferSize(int bytesRead, long readTimeMs)
{
// Adaptive buffer sizing based on network performance
if (readTimeMs > 100) // Slow read (>100ms)
{
_consecutiveSlowReads++;
if (_consecutiveSlowReads >= 3 && _currentBufferSize > MinBufferSize)
{
// Reduce buffer size for slow connections
_currentBufferSize = Math.Max(MinBufferSize, _currentBufferSize / 2);
_consecutiveSlowReads = 0;
}
}
else if (readTimeMs < 20 && bytesRead == _currentBufferSize) // Fast read, buffer fully utilized
{
_consecutiveSlowReads = 0;
if (_currentBufferSize < MaxBufferSize)
{
// Increase buffer size for fast connections
_currentBufferSize = Math.Min(MaxBufferSize, _currentBufferSize * 2);
}
}
else
{
_consecutiveSlowReads = 0;
}
}
}