1298 lines
66 KiB
C#
1298 lines
66 KiB
C#
using DeepDrftModels.DTOs;
|
||
using DeepDrftModels.Enums;
|
||
using DeepDrftPublic.Client.Clients;
|
||
using System.Buffers;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.JSInterop;
|
||
|
||
namespace DeepDrftPublic.Client.Services;
|
||
|
||
public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerService
|
||
{
|
||
// Configuration constants
|
||
private const int DefaultBufferSize = 32 * 1024; // 32KB chunks
|
||
private const int NotificationThrottleMs = 100; // Throttle UI updates to max 10 per second
|
||
|
||
// Adaptive chunk sizing
|
||
private const int MinBufferSize = 16 * 1024; // 16KB minimum
|
||
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
|
||
|
||
// Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water
|
||
// mark, the segment loop stops fetching the next segment and polls IsProductionPaused at this
|
||
// cadence until the fill drains below low-water. 100 ms is well under the low-water lookahead
|
||
// (seconds), so resume is prompt relative to the playhead — no starvation (AC3) — while keeping
|
||
// the poll cheap. The poll honors the loop's cancellation token, so a track switch/seek during a
|
||
// pause exits through the same drain discipline as a pause during ReadAsync (C6).
|
||
private const int BackpressurePollMs = 100;
|
||
|
||
// Phase 21 Direction B — forward Range-segment size. The forward stream is fetched as a
|
||
// sequence of bounded "bytes=cursor-(cursor+SegmentSizeBytes-1)" 206 requests, the next issued
|
||
// only when the scheduler drains below low-water. Because each request is bounded and fully
|
||
// consumed before the next is issued, the browser fetch holds AT MOST ~one segment of raw bytes
|
||
// regardless of file size — this is the network-memory bound the phase exists for (the open-ended
|
||
// single GET buffered the whole ~970 MB body in the browser even when reads were paused, the
|
||
// 21.4 finding). 4 MB balances request overhead (a 1 GB mix is ~250 segments) against memory:
|
||
// at the 30 s high-water mark a fast connection holds well under a segment of unplayed raw bytes,
|
||
// so the bound is the segment size, not the decoded window. Tunable; not magic.
|
||
private const long SegmentSizeBytes = 4 * 1024 * 1024;
|
||
|
||
// Phase 18 wave 18.6 — position-preserving format switch ("load at timestamp"). When the listener
|
||
// changes streaming quality mid-track, the new-format stream is started DIRECTLY at the saved
|
||
// position rather than from byte 0: the load resolves the byte offset for the target time in the
|
||
// freshly-initialized decoder and streams from there (never audibly playing the start). For WAV the
|
||
// byte-offset math needs the header, so the byte-0 segment is probed (fed to the decoder WITHOUT
|
||
// starting playback) until the header parses; this caps that probe so a header that never appears
|
||
// (corrupt stream) can't read unbounded. The decoder's own header-search ceiling is 256 KB, so this
|
||
// matches it. Opus needs no probe (its sidecar resolves offsets immediately after init).
|
||
private const int MaxHeaderProbeBytes = 256 * 1024;
|
||
|
||
private int _currentBufferSize = DefaultBufferSize;
|
||
private int _consecutiveSlowReads = 0;
|
||
|
||
|
||
// Streaming state properties
|
||
public bool IsStreamingMode { get; private set; } = false;
|
||
public bool CanStartStreaming { get; private set; } = false;
|
||
public bool HeaderParsed { get; private set; } = false;
|
||
public int BufferedChunks { get; private set; } = 0;
|
||
public bool IsSeekingBeyondBuffer { get; private set; } = false;
|
||
|
||
private bool _streamingPlaybackStarted = false;
|
||
private CancellationTokenSource? _streamingCancellation;
|
||
private Task? _activeStreamingTask;
|
||
private DateTime _lastNotification = DateTime.MinValue;
|
||
private readonly ILogger<StreamingAudioPlayerService> _logger;
|
||
private string? _currentTrackId;
|
||
|
||
// The delivery format the active load resolved to (Phase 18). Captured once per LoadTrackStreaming and
|
||
// reused by the seek-beyond-buffer re-fetch so the Range continuation requests the SAME artifact the
|
||
// initial stream did — a seek must never switch formats mid-track (the JS decoder, the cached setup
|
||
// header, and the byte offsets all belong to one artifact). Defaults to Lossless until a load resolves.
|
||
private AudioFormat _currentFormat = AudioFormat.Lossless;
|
||
|
||
// Phase 16 play-session telemetry (§2.1). The tracker observes the playback lifecycle and emits at
|
||
// most one bucketed play event per session, behind the engagement floor. Attached after construction
|
||
// by AudioPlayerProvider (the player is not DI-registered), mirroring how QueueService binds — no
|
||
// constructor growth propagated through DI, no construction cycle. Null when telemetry is not wired
|
||
// (e.g. unit tests that construct the player without it), so every call is null-guarded.
|
||
private PlayTracker? _playTracker;
|
||
private BeaconInterop? _beacon;
|
||
private DotNetObjectReference<StreamingAudioPlayerService>? _unloadRef;
|
||
private string? _unloadKey;
|
||
|
||
// One-shot guard so the play session opens exactly once per LoadTrackStreaming — never on the
|
||
// SeekBeyondBuffer re-stream, which reuses _currentTrackId and re-runs the playback-start transition
|
||
// with _streamingPlaybackStarted reset. A seek-beyond-buffer is the SAME play (§1d), so it must not
|
||
// open a new session. Set true when the session opens; reset only by LoadTrackStreaming.
|
||
private bool _sessionOpened;
|
||
|
||
public StreamingAudioPlayerService(
|
||
AudioInteropService audioInterop,
|
||
TrackMediaClient trackMediaClient,
|
||
ILogger<StreamingAudioPlayerService> logger)
|
||
: base(audioInterop, trackMediaClient)
|
||
{
|
||
_logger = logger;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Wire the play-session tracker and beacon transport into the player after construction (Phase 16
|
||
/// §2.1). Called once by <c>AudioPlayerProvider</c>. Kept off the constructor deliberately: the player
|
||
/// is built with <c>new</c> by the provider (not DI), so threading the tracker through the constructor
|
||
/// would force the provider to resolve it too — instead the provider injects the tracker's collaborators
|
||
/// and hands a built tracker here, the same post-construction binding QueueService uses. Also registers
|
||
/// the page-unload handler so a mid-play tab-close still records the play via sendBeacon.
|
||
/// </summary>
|
||
public void AttachTracker(PlayTracker tracker, BeaconInterop beacon)
|
||
{
|
||
_playTracker = tracker;
|
||
_beacon = beacon;
|
||
|
||
_unloadRef = DotNetObjectReference.Create(this);
|
||
_unloadKey = PlayerId;
|
||
// Fire-and-forget: registration only needs to have happened before the listener leaves; it
|
||
// never gates playback. A failure simply means tab-close mid-play isn't recorded.
|
||
_ = _beacon.RegisterUnloadAsync(_unloadKey, _unloadRef, nameof(OnPageUnload));
|
||
}
|
||
|
||
/// <summary>
|
||
/// Close the open play session as the page unloads (pagehide / visibility→hidden). Invoked
|
||
/// synchronously from the beacon's unload handler so the session's beacon is queued before the page
|
||
/// freezes. <see cref="PlayTracker.Close"/> is idempotent, so a later organic close is a no-op.
|
||
/// </summary>
|
||
[JSInvokable]
|
||
public void OnPageUnload() => _playTracker?.Close();
|
||
|
||
// Advance the play-session high-water mark on each progress tick (§2.1). Seeking backward never
|
||
// lowers it — the tracker takes the max.
|
||
protected override void OnProgressTick(double currentTime) => _playTracker?.OnProgress(currentTime);
|
||
|
||
// Organic end-of-stream closes the session; the bucket reflects the high-water fraction reached.
|
||
protected override void OnPlaybackEnded() => _playTracker?.Close();
|
||
|
||
public override async Task SelectTrack(TrackDto track)
|
||
{
|
||
await SelectTrackStreaming(track);
|
||
}
|
||
|
||
/// <inheritdoc />
|
||
public async Task WarmAudioContext()
|
||
{
|
||
await EnsureInitializedAsync();
|
||
await _audioInterop.EnsureAudioContextReady(PlayerId);
|
||
}
|
||
|
||
public async Task SelectTrackStreaming(TrackDto track)
|
||
{
|
||
await EnsureInitializedAsync();
|
||
|
||
// Resume AudioContext immediately on track selection (user interaction) to avoid clicks later
|
||
await _audioInterop.EnsureAudioContextReady(PlayerId);
|
||
|
||
await NotifyTrackSelected();
|
||
|
||
await LoadTrackStreaming(track);
|
||
await NotifyStateChanged();
|
||
}
|
||
|
||
/// <inheritdoc />
|
||
public async Task StageTrack(TrackDto track)
|
||
{
|
||
// Pure state: expose the track as current so the bar shows it ready, but do NOT
|
||
// initialize the player, resume the AudioContext, or start streaming. Those steps
|
||
// require a user gesture and run on the first play click via SelectTrackStreaming.
|
||
CurrentTrack = track;
|
||
ErrorMessage = null;
|
||
await NotifyStateChanged();
|
||
}
|
||
|
||
/// <inheritdoc />
|
||
public async Task ReloadPreservingPositionAsync()
|
||
{
|
||
// Nothing playing → nothing to switch. The new preference simply takes effect on the next play.
|
||
if (CurrentTrack is not { } track || !IsStreamingMode) return;
|
||
|
||
// Capture the position to restore before the reload resets streaming state. Near the very start
|
||
// there is nothing worth preserving — a plain restart in the new format is simpler and avoids a
|
||
// needless seek-offset resolution.
|
||
var resumeAt = CurrentTime;
|
||
|
||
await EnsureInitializedAsync();
|
||
await _audioInterop.EnsureAudioContextReady(PlayerId);
|
||
await NotifyTrackSelected();
|
||
|
||
// Reload the same track in the newly-resolved delivery format. A near-start position restarts
|
||
// from byte 0; otherwise the load begins DIRECTLY at the saved position (no audible playback
|
||
// from the start). LoadTrackStreaming runs the whole forward segment loop, so this is the last
|
||
// meaningful await — the caller already fires this fire-and-forget.
|
||
await LoadTrackStreaming(track, startPosition: resumeAt > 1.0 ? resumeAt : null);
|
||
await NotifyStateChanged();
|
||
}
|
||
|
||
private async Task LoadTrackStreaming(TrackDto track, double? startPosition = null)
|
||
{
|
||
// Always reset to clean state before loading new track. ResetToIdle
|
||
// both cancels and awaits any in-flight streaming loop, so by the time
|
||
// we return from it the previous loop is guaranteed to have exited and
|
||
// there is no risk of interleaved ProcessStreamingChunk calls against
|
||
// the single-instance JS StreamDecoder.
|
||
await ResetToIdle();
|
||
|
||
// Save track ID for seek operations
|
||
_currentTrackId = track.EntryKey;
|
||
// A fresh load is a fresh play candidate (§1d: replays = multiple plays). Arm the
|
||
// one-shot session-open guard; the session actually opens at the playback-start transition
|
||
// below (a track that fails to load never reaches it, so it does not count).
|
||
_sessionOpened = false;
|
||
// Expose to UI immediately — Now-Playing surfaces should reflect the selected
|
||
// track while it's still loading, not only after playback starts.
|
||
CurrentTrack = track;
|
||
|
||
// Create new cancellation token for this streaming operation. Capture it in a local
|
||
// so the catch/finally can compare identity against _streamingCancellation: a seek
|
||
// replaces _streamingCancellation with its own seekCts before this load's continuation
|
||
// resumes on the single-threaded WASM dispatcher, and we must not clobber the seek's state.
|
||
var loadCts = new CancellationTokenSource();
|
||
_streamingCancellation = loadCts;
|
||
|
||
// Fetch the waveform profile alongside the audio. Fire-and-forget against the same
|
||
// streaming token so a track switch abandons it; it only updates display state and must
|
||
// never gate or fail the audio load (a missing profile yields the flat-seekbar fallback).
|
||
_ = LoadWaveformProfileAsync(track.EntryKey, loadCts.Token);
|
||
|
||
try
|
||
{
|
||
// Set state to indicate loading has started
|
||
ErrorMessage = null;
|
||
LoadProgress = 0;
|
||
IsLoading = true;
|
||
IsStreamingMode = true;
|
||
|
||
// Reset adaptive buffer sizing
|
||
_currentBufferSize = DefaultBufferSize;
|
||
_consecutiveSlowReads = 0;
|
||
|
||
await NotifyStateChanged();
|
||
|
||
// Resolve the delivery format for this load BEFORE requesting bytes (Phase 18, default policy
|
||
// OQ2). When Opus is chosen the sidecar is fetched and injected into the JS player here, ahead of
|
||
// InitializeStreaming, honouring the 18.4 set-before-init contract. The result is captured so the
|
||
// seek-beyond-buffer re-fetch reuses the same artifact.
|
||
_currentFormat = await ResolveStreamFormatAsync(track.EntryKey, loadCts.Token);
|
||
|
||
// Direction B: fetch the FIRST bounded segment to learn the total file length and the
|
||
// content type. The 206 Content-Range carries the total; the segment loop advances its
|
||
// cursor toward it. The decoder is initialized with the TOTAL length (not the segment
|
||
// length) so a bounded segment's small Content-Length never trips its byte-count
|
||
// completion early — segment boundaries are invisible to the decoder, which sees one
|
||
// continuous in-order byte stream. Passing the streaming token aborts the server
|
||
// connection on a navigation/track switch instead of leaving it draining bytes.
|
||
var firstSegment = await _trackMediaClient.GetTrackMedia(
|
||
track.EntryKey,
|
||
byteOffset: 0,
|
||
byteEnd: SegmentSizeBytes - 1,
|
||
format: _currentFormat,
|
||
cancellationToken: loadCts.Token);
|
||
if (!firstSegment.Success)
|
||
{
|
||
var technicalError = firstSegment.GetMessage();
|
||
_logger.LogError("Failed to get track media for {TrackId}: {Error}",
|
||
track.EntryKey, technicalError);
|
||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||
return;
|
||
}
|
||
|
||
if (firstSegment.Value == null)
|
||
{
|
||
const string technicalError = "No audio returned from server";
|
||
_logger.LogError("No audio data returned for track {TrackId}", track.EntryKey);
|
||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||
return;
|
||
}
|
||
|
||
// Ownership of the first segment transfers to the segment loop, which disposes it (and
|
||
// every subsequent segment). No `using` here — a double dispose is avoided and the socket
|
||
// is released the moment the loop finishes consuming the segment.
|
||
var audio = firstSegment.Value;
|
||
|
||
// The total logical length the decoder must see. On a 206 the Content-Range carries it;
|
||
// a 200 (server ignored Range / file ≤ one segment) has no Content-Range, so fall back to
|
||
// the body's own Content-Length — that body IS the whole file in that case.
|
||
var totalLength = audio.TotalLength ?? audio.ContentLength;
|
||
|
||
// Initialize streaming mode with the TOTAL length and media type (drives JS
|
||
// format-decoder selection). See above: total, not segment, length.
|
||
var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, totalLength, audio.ContentType);
|
||
if (!streamingResult.Success)
|
||
{
|
||
audio.Dispose();
|
||
var technicalError = $"Failed to initialize streaming: {streamingResult.Error}";
|
||
_logger.LogError("Streaming initialization failed for track {TrackId}: {Error}",
|
||
track.EntryKey, technicalError);
|
||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||
return;
|
||
}
|
||
|
||
if (startPosition is { } startAt)
|
||
{
|
||
// "Load at timestamp" (Phase 18 wave 18.6 format switch): begin the stream DIRECTLY at
|
||
// startAt rather than byte 0, so the listener never hears the track restart from the
|
||
// beginning. The byte-0 segment in hand is used only to parse the header for byte-offset
|
||
// math (WAV) — Opus resolves the offset from its sidecar with no probe — and then a fresh
|
||
// segment is fetched from the resolved offset and pumped via the shared seek/refill loop.
|
||
await StartFromPositionAsync(track.EntryKey, audio, totalLength, startAt, loadCts.Token);
|
||
}
|
||
else
|
||
{
|
||
// Forward segmentation from byte 0. The first segment is already in hand; the loop pumps
|
||
// it, then fetches subsequent bounded segments gated on the scheduler fill signal.
|
||
_activeStreamingTask = RunSegmentedStreamAsync(
|
||
track.EntryKey, audio, cursor: 0, totalLength, seekPosition: null, loadCts.Token);
|
||
await _activeStreamingTask;
|
||
}
|
||
}
|
||
catch (OperationCanceledException) when (loadCts.IsCancellationRequested)
|
||
{
|
||
// Cancellation is expected when this load was superseded (track switch or seek).
|
||
// The when filter ensures HttpClient timeout OCEs — where loadCts was NOT
|
||
// cancelled — fall through to the error handler below instead of being swallowed.
|
||
_logger.LogDebug("Audio streaming cancelled for track {TrackId}", track.EntryKey);
|
||
|
||
// Only reset streaming state if this load is still the active operation. A seek
|
||
// in flight has already replaced _streamingCancellation with its own seekCts and
|
||
// owns IsLoaded/IsStreamingMode; clobbering them here corrupts the seek mid-flight.
|
||
if (ReferenceEquals(_streamingCancellation, loadCts))
|
||
{
|
||
IsLoaded = false;
|
||
IsStreamingMode = false;
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey);
|
||
var userError = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
|
||
|
||
// Mid-stream failure (playback was already underway): halt the JS scheduler into a clean
|
||
// paused-but-loaded state exactly as the seek path does via RecoverFromFailedRefill, rather
|
||
// than resetting to unloaded and letting the scheduler's buffered tail drain into a silent
|
||
// false end (AC6). Apply only when this load is still the active operation — a superseding
|
||
// seek owns state and has already replaced _streamingCancellation with its own CTS.
|
||
if (_streamingPlaybackStarted && ReferenceEquals(_streamingCancellation, loadCts))
|
||
{
|
||
await RecoverFromFailedRefill(CurrentTime, userError);
|
||
}
|
||
else if (ReferenceEquals(_streamingCancellation, loadCts))
|
||
{
|
||
// First-segment failure (nothing buffered / playing yet), still the active operation:
|
||
// the normal unload-to-error path is correct — nothing is in the scheduler to halt.
|
||
ErrorMessage = userError;
|
||
LoadProgress = 0;
|
||
IsLoaded = false;
|
||
IsStreamingMode = false;
|
||
}
|
||
else
|
||
{
|
||
// Superseded load: a newer seek (or track switch) has already claimed _streamingCancellation
|
||
// and owns all shared state. Writing IsLoaded/IsStreamingMode here would corrupt the live
|
||
// operation — mirror the OCE catch's identity guard and do nothing to shared state.
|
||
_logger.LogDebug("Generic throw on superseded load for track {TrackId} — newer operation owns state, skipping unload", track.EntryKey);
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
IsLoading = false;
|
||
// Only notify if this load is still the active operation. A superseding seek
|
||
// owns state notifications; firing here mid-seek would push a stale snapshot.
|
||
if (ReferenceEquals(_streamingCancellation, loadCts))
|
||
{
|
||
await NotifyStateChanged();
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Begin streaming the freshly-initialized track DIRECTLY at <paramref name="startPosition"/> instead
|
||
/// of byte 0 (Phase 18 wave 18.6 — the position-preserving format switch). The decoder has already been
|
||
/// built by <c>InitializeStreaming</c>; this resolves the file-absolute byte offset for the target time
|
||
/// and then converges onto the shared seek/refill loop (<see cref="RunSegmentedStreamAsync"/> with a
|
||
/// non-null seekPosition), which reinitializes the decoder for a header-less Range continuation and
|
||
/// starts playback at the target — so nothing is ever audibly played from the start.
|
||
/// <para>
|
||
/// Opus resolves the offset from its sidecar immediately; WAV needs its header, so the byte-0 segment
|
||
/// already in hand is fed to the decoder (WITHOUT starting playback) until the header parses, then the
|
||
/// offset resolves. The probe is bounded by <see cref="MaxHeaderProbeBytes"/>. The byte-0 segment is
|
||
/// disposed once the header is in hand; the continuation is a fresh fetch from the resolved offset.
|
||
/// </para>
|
||
/// </summary>
|
||
private async Task StartFromPositionAsync(
|
||
string trackId,
|
||
TrackMediaResponse headerSegment,
|
||
long totalLength,
|
||
double startPosition,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
// Resolve the byte offset for the target time. Opus answers immediately from its sidecar; WAV
|
||
// returns failure until its header is parsed, so we probe the byte-0 segment and retry. The
|
||
// byte-0 segment is disposed once it has served its purpose (header probe / nothing for Opus),
|
||
// even if the probe throws, so its socket never leaks before the continuation fetch.
|
||
SeekResult resolved;
|
||
try
|
||
{
|
||
resolved = await _audioInterop.ResolveStreamOffsetAsync(PlayerId, startPosition);
|
||
if (!resolved.Success || !resolved.SeekBeyondBuffer)
|
||
{
|
||
await ProbeHeaderAsync(headerSegment, cancellationToken);
|
||
resolved = await _audioInterop.ResolveStreamOffsetAsync(PlayerId, startPosition);
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
headerSegment.Dispose();
|
||
}
|
||
|
||
if (!resolved.Success || !resolved.SeekBeyondBuffer)
|
||
{
|
||
// Could not resolve an offset even after probing — the stream is unusable for a positioned
|
||
// start. Surface as an error rather than silently restarting from 0 (which would contradict
|
||
// the "preserve position" contract the listener invoked). The catch in LoadTrackStreaming
|
||
// settles the error state.
|
||
throw new Exception(resolved.Error ?? "Could not resolve a stream offset for the requested position");
|
||
}
|
||
|
||
// Fetch the FIRST bounded segment from the resolved offset and pump it through the shared loop
|
||
// exactly as a seek-beyond-buffer does (reinit for the header-less continuation happens inside,
|
||
// and playback starts at startPosition). Reuse the format the load already resolved to.
|
||
var byteOffset = resolved.ByteOffset;
|
||
var firstSegment = await _trackMediaClient.GetTrackMedia(
|
||
trackId,
|
||
byteOffset,
|
||
byteEnd: byteOffset + SegmentSizeBytes - 1,
|
||
format: _currentFormat,
|
||
cancellationToken: cancellationToken);
|
||
if (!firstSegment.Success || firstSegment.Value == null)
|
||
{
|
||
var technicalError = firstSegment.GetMessage() ?? "Failed to load audio from position";
|
||
_logger.LogError("Failed to get track media from offset {Offset} for {TrackId}: {Error}",
|
||
byteOffset, trackId, technicalError);
|
||
throw new Exception(technicalError);
|
||
}
|
||
|
||
var audio = firstSegment.Value;
|
||
// The absolute EOF boundary the segment loop targets. On a 206 the Content-Range carries the file
|
||
// total; on a 200 (single-segment tail) fall back to the offset plus this body's length.
|
||
var continuationTotal = audio.TotalLength ?? (byteOffset + audio.ContentLength);
|
||
|
||
// Fresh playback-start transition for the positioned stream (it has not started yet).
|
||
_streamingPlaybackStarted = false;
|
||
CanStartStreaming = false;
|
||
BufferedChunks = 0;
|
||
// Reflect the landing position immediately so the UI seek bar shows the right spot while the
|
||
// first post-offset buffers decode.
|
||
CurrentTime = startPosition;
|
||
|
||
_activeStreamingTask = RunSegmentedStreamAsync(
|
||
trackId, audio, cursor: byteOffset, continuationTotal, seekPosition: startPosition, cancellationToken);
|
||
await _activeStreamingTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Feed bytes from the byte-0 <paramref name="segment"/> into the decoder until its header parses
|
||
/// (<see cref="HeaderParsed"/>), WITHOUT starting playback — the WAV byte-offset math needs the header
|
||
/// before <see cref="AudioInteropService.ResolveStreamOffsetAsync"/> can answer. Bounded by
|
||
/// <see cref="MaxHeaderProbeBytes"/> so a stream that never yields a header cannot read unbounded. The
|
||
/// decoded buffers this queues are dropped by the subsequent Range-continuation reinit (clearForSeek),
|
||
/// so nothing is audible and nothing leaks. Opus never reaches here (its offset resolves pre-probe).
|
||
/// </summary>
|
||
private async Task ProbeHeaderAsync(TrackMediaResponse segment, CancellationToken cancellationToken)
|
||
{
|
||
var buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize);
|
||
try
|
||
{
|
||
var probed = 0;
|
||
while (!HeaderParsed && probed < MaxHeaderProbeBytes)
|
||
{
|
||
var read = await segment.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
|
||
if (read <= 0) break; // segment exhausted before a header — let the caller surface the failure.
|
||
probed += read;
|
||
|
||
// Slice to the exact bytes read (the pooled buffer may carry stale tail bytes).
|
||
var chunk = buffer.AsSpan(0, read).ToArray();
|
||
var result = await _audioInterop.ProcessStreamingChunk(PlayerId, chunk);
|
||
if (!result.Success)
|
||
{
|
||
throw new Exception($"Failed to process header probe chunk: {result.Error}");
|
||
}
|
||
|
||
HeaderParsed = result.HeaderParsed;
|
||
// Capture the once-only duration the header yields so the UI and play session have it.
|
||
if (result.Duration.HasValue && Duration == null)
|
||
{
|
||
Duration = result.Duration.Value;
|
||
_playTracker?.SetDuration(result.Duration.Value);
|
||
}
|
||
}
|
||
}
|
||
finally
|
||
{
|
||
ArrayPool<byte>.Shared.Return(buffer);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Resolves which delivery format this load should request (Phase 18 default policy, OQ2): Opus when the
|
||
/// browser can decode Ogg Opus AND a sidecar exists for the track, otherwise lossless. When Opus is
|
||
/// chosen the sidecar is injected into the JS player here (set-before-init, the 18.4 contract) so the
|
||
/// decoder has its setup header + seek index before <c>InitializeStreaming</c> builds it.
|
||
/// <para>
|
||
/// This is the single, deliberately-overridable seam for the listener quality preference (wave 18.6).
|
||
/// 18.6 overrides this to honour the user's "streaming quality" toggle — returning lossless when the
|
||
/// listener picked it, and otherwise falling through to this capability-gated default. The capability
|
||
/// gate (AC7) and the sidecar-absent → lossless fallback (C2) stay here so any override inherits both:
|
||
/// a browser that cannot decode Opus, or a track with no sidecar, always lands on lossless and plays.
|
||
/// </para>
|
||
/// </summary>
|
||
protected virtual async Task<AudioFormat> ResolveStreamFormatAsync(string entryKey, CancellationToken cancellationToken)
|
||
{
|
||
// Capability gate first (AC7): never hand Ogg Opus to a browser that cannot decode it.
|
||
if (!await _audioInterop.CanDecodeOggOpus())
|
||
{
|
||
return AudioFormat.Lossless;
|
||
}
|
||
|
||
// The sidecar must be present (and parseable by the JS decoder) to seek an Opus stream. Its absence
|
||
// means the track has no Opus artifact yet (legacy / not backfilled / transcode failed) — request
|
||
// lossless rather than Opus-without-a-sidecar (the server would C2-fall-back anyway, but asking for
|
||
// lossless keeps the request honest and avoids a wasted Opus-then-fallback round-trip).
|
||
var sidecar = await _trackMediaClient.GetOpusSidecarAsync(entryKey, cancellationToken);
|
||
if (!sidecar.Success || sidecar.Value is not { Length: > 0 } sidecarBytes)
|
||
{
|
||
return AudioFormat.Lossless;
|
||
}
|
||
|
||
// Inject BEFORE InitializeStreaming (the set-before-init contract). A parse failure here means the
|
||
// bytes are not a usable sidecar — fall back to lossless so a malformed sidecar never breaks playback.
|
||
var injected = await _audioInterop.SetOpusSidecar(PlayerId, sidecarBytes);
|
||
if (!injected.Success)
|
||
{
|
||
_logger.LogWarning("Opus sidecar for {EntryKey} failed to parse ({Error}); falling back to lossless.",
|
||
entryKey, injected.Error);
|
||
return AudioFormat.Lossless;
|
||
}
|
||
|
||
return AudioFormat.Opus;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Fetches and decodes the track's waveform loudness profile, then notifies state so the
|
||
/// seek zone re-renders with real bars. Best-effort: a 404 (no stored profile) or any other
|
||
/// failure simply leaves <see cref="AudioPlayerService.WaveformProfile"/> null, which the
|
||
/// WaveformSeeker renders as a flat-but-seekable fallback. Never throws into the load path.
|
||
/// </summary>
|
||
private async Task LoadWaveformProfileAsync(string entryKey, CancellationToken cancellationToken)
|
||
{
|
||
WaveformProfile = null;
|
||
|
||
try
|
||
{
|
||
var result = await _trackMediaClient.GetWaveformProfileAsync(entryKey, cancellationToken);
|
||
if (cancellationToken.IsCancellationRequested) return;
|
||
|
||
if (result.Success && result.Value is { } dto)
|
||
{
|
||
WaveformProfile = DecodeWaveformProfile(dto);
|
||
await NotifyStateChanged();
|
||
}
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
// Track switched or stopped before the profile arrived — nothing to surface.
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// A failed profile fetch must not disturb playback; log and fall back to flat bars.
|
||
_logger.LogDebug(ex, "Failed to load waveform profile for {EntryKey}", entryKey);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Decodes a <see cref="WaveformProfileDto"/> (base64 of byte[BucketCount], each 0..255) into
|
||
/// a normalized double[] in [0, 1]. Returns null if the payload is malformed so callers treat
|
||
/// it as "no profile" rather than rendering garbage bars.
|
||
/// </summary>
|
||
private static double[]? DecodeWaveformProfile(WaveformProfileDto dto)
|
||
{
|
||
if (string.IsNullOrEmpty(dto.Data)) return null;
|
||
|
||
byte[] bytes;
|
||
try
|
||
{
|
||
bytes = Convert.FromBase64String(dto.Data);
|
||
}
|
||
catch (FormatException)
|
||
{
|
||
return null;
|
||
}
|
||
|
||
if (bytes.Length == 0) return null;
|
||
|
||
var profile = new double[bytes.Length];
|
||
for (var i = 0; i < bytes.Length; i++)
|
||
{
|
||
profile[i] = bytes[i] / 255.0;
|
||
}
|
||
return profile;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Phase 21 Direction B — the single segmented forward read loop, shared by the initial load and
|
||
/// the seek/refill path (the convergence C1/C5 require: one cursor, one fetch mechanism, no forked
|
||
/// path). It pumps the FIRST segment (already fetched by the caller), then fetches subsequent
|
||
/// bounded <c>bytes=cursor-(cursor+SegmentSizeBytes-1)</c> 206 segments — each only AFTER the
|
||
/// scheduler drains below low-water — until the cursor reaches <paramref name="totalLength"/>.
|
||
/// Because each segment is bounded and fully consumed before the next is requested, the browser
|
||
/// holds at most ~one segment of raw bytes (the network-memory bound), while the decoder sees one
|
||
/// continuous in-order byte stream across segment boundaries (the demuxer/decoder buffer partial
|
||
/// frames/pages across the boundary exactly as for arbitrary chunks today — no per-segment reinit).
|
||
/// </summary>
|
||
/// <param name="firstSegment">The already-fetched first segment (byte <paramref name="cursor"/>).
|
||
/// Owned by this method, which disposes it; subsequent segments are fetched and disposed inline.</param>
|
||
/// <param name="cursor">File-absolute byte offset the first segment starts at (0 for a fresh load,
|
||
/// the resolved seek offset for a refill).</param>
|
||
/// <param name="totalLength">Total file length in bytes — the EOF boundary the cursor advances
|
||
/// toward. The decoder is initialized/reinitialized against this, not the per-segment length.</param>
|
||
/// <param name="seekPosition">Non-null for a seek/refill: the decoder is reinitialized for the
|
||
/// header-less Range continuation at this time before the first segment's bytes are fed (WAV
|
||
/// retains its header, Opus re-applies the cached setup + lead-trim). Null for a forward load from
|
||
/// byte 0, where the first segment carries the header and no reinit is needed.</param>
|
||
private async Task RunSegmentedStreamAsync(
|
||
string trackId,
|
||
TrackMediaResponse firstSegment,
|
||
long cursor,
|
||
long totalLength,
|
||
double? seekPosition,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
byte[]? buffer = null;
|
||
var segment = firstSegment;
|
||
try
|
||
{
|
||
// Seek/refill: reinitialize the active decoder for the header-less continuation ONCE,
|
||
// before any continuation bytes are fed. Forward-from-zero (seekPosition null) skips this
|
||
// — its first segment carries the real header the decoder parses. Done here, inside the
|
||
// single loop, so seek and forward share the same fetch+pump mechanism (no forked path).
|
||
if (seekPosition is { } resumeAt)
|
||
{
|
||
// The decoder byte-counts the header-less continuation against the bytes REMAINING
|
||
// from the range start to EOF (total − cursor), not the absolute total — that is what
|
||
// reinitializeForRangeContinuation expects (StreamDecoder.remainingByteLength). The
|
||
// loop's own cursor still targets the absolute totalLength for EOF.
|
||
var remainingBytes = Math.Max(0, totalLength - cursor);
|
||
var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, remainingBytes, resumeAt);
|
||
if (!reinitResult.Success)
|
||
{
|
||
throw new Exception($"Failed to reinitialize for offset streaming: {reinitResult.Error}");
|
||
}
|
||
}
|
||
|
||
buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize); // larger rental to fit adaptive sizing
|
||
var readTimer = System.Diagnostics.Stopwatch.StartNew();
|
||
|
||
// Segment loop. Each iteration fully consumes one bounded 206 body, advancing the cursor by
|
||
// the bytes received. The next segment is fetched only when the scheduler is below
|
||
// high-water (the inter-segment gate). EOF is the cursor reaching totalLength, or a short
|
||
// segment (server returned fewer bytes than requested — the final slice).
|
||
while (true)
|
||
{
|
||
long segmentBytesRead = 0;
|
||
int currentBytes;
|
||
do
|
||
{
|
||
readTimer.Restart();
|
||
currentBytes = await segment.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
|
||
readTimer.Stop();
|
||
|
||
AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds);
|
||
|
||
if (currentBytes > 0)
|
||
{
|
||
segmentBytesRead += currentBytes;
|
||
|
||
// Slice to the exact bytes read: the pooled buffer is rented at MaxBufferSize
|
||
// and may carry stale bytes past currentBytes from a prior rental — handing the
|
||
// full array to JS would serialise that garbage into the audio stream.
|
||
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
|
||
|
||
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
|
||
if (!chunkResult.Success)
|
||
{
|
||
var error = $"Failed to process streaming chunk: {chunkResult.Error}";
|
||
_logger.LogWarning("Chunk processing failed: {Error}", error);
|
||
throw new Exception(error);
|
||
}
|
||
|
||
CanStartStreaming = chunkResult.CanStartStreaming;
|
||
HeaderParsed = chunkResult.HeaderParsed;
|
||
BufferedChunks = chunkResult.BufferCount;
|
||
|
||
// Set duration from header when available (only set once)
|
||
if (chunkResult.Duration.HasValue && Duration == null)
|
||
{
|
||
Duration = chunkResult.Duration.Value;
|
||
_logger.LogInformation("Duration set from header: {Duration:F2} seconds", Duration);
|
||
// Feed the once-only duration to the play session for the completion
|
||
// fraction. No-op when no session is open; idempotent otherwise.
|
||
_playTracker?.SetDuration(chunkResult.Duration.Value);
|
||
}
|
||
|
||
// Start playback as soon as we can — at the min-buffer threshold, exactly as
|
||
// before (C2: first audio is not gated on the segment boundary; the first
|
||
// segment alone clears the threshold).
|
||
if (!_streamingPlaybackStarted && CanStartStreaming)
|
||
{
|
||
await TryStartPlaybackAsync();
|
||
}
|
||
|
||
// Progress against the total file length (cursor + bytes consumed so far).
|
||
if (totalLength > 0)
|
||
{
|
||
LoadProgress = Math.Min(1.0, (double)(cursor + segmentBytesRead) / totalLength);
|
||
}
|
||
|
||
await ThrottledNotifyStateChanged();
|
||
|
||
// Per-chunk back-pressure — the bound that actually holds for high-density codecs.
|
||
// The inter-segment gate alone is matched to WAV's byte density (~24 s of audio per
|
||
// 4 MB segment) but NOT to Opus: at 320 kbps a 4 MB segment is ~100 s of decodable
|
||
// audio. The inner loop has the whole segment's bytes already in hand, so with no
|
||
// network wait to pace it, it would decode the ENTIRE segment eagerly — piling tens
|
||
// of MB of decoded f32 PCM AHEAD of a playhead that has barely moved, before the
|
||
// inter-segment gate ever runs. With HW accel off that lookahead lives in main-
|
||
// process RAM, and the byte ceiling cannot save us because nothing on this path
|
||
// polls it. So drain to low-water per chunk once the scheduler is over high-water.
|
||
//
|
||
// Gated on _streamingPlaybackStarted so this can NEVER block first audio (C2): until
|
||
// playback starts the playhead does not advance, so the forward fill would never
|
||
// drain and the loop would deadlock. The 30 s high-water sits far above the
|
||
// 6-buffer playback-start minimum, so in practice the gate is not even reached
|
||
// before playback begins — the guard is the correctness backstop, not the common
|
||
// case. Reads the piggybacked flag (no extra interop hop) to DECIDE to drain; the
|
||
// drain helper then polls IsProductionPaused — the same steady-state-reads-flag /
|
||
// throttled-state-polls split the inter-segment gate uses.
|
||
if (_streamingPlaybackStarted && chunkResult.ProductionPaused)
|
||
{
|
||
await DrainBackpressureAsync(cancellationToken);
|
||
}
|
||
}
|
||
} while (currentBytes > 0);
|
||
|
||
// Segment fully consumed; advance the cursor and release this segment's stream/socket
|
||
// before deciding whether to fetch the next. Disposing here keeps exactly one segment's
|
||
// raw bytes resident at a time.
|
||
cursor += segmentBytesRead;
|
||
segment.Dispose();
|
||
segment = null!;
|
||
|
||
// EOF: cursor reached the total file length. This is the sole forward-EOF condition.
|
||
// A short segment body (segmentBytesRead < SegmentSizeBytes) is NOT an EOF signal —
|
||
// the inner read loop fully drains the HTTP body, so a short body means the server
|
||
// sent fewer bytes than the bounded range we requested. While cursor < totalLength that
|
||
// can only be a connection drop / truncated stream, NOT the file tail — route it to
|
||
// the same clean-failure recovery as a fetch error rather than silently completing.
|
||
var reachedTotal = totalLength > 0 && cursor >= totalLength;
|
||
if (reachedTotal)
|
||
{
|
||
break;
|
||
}
|
||
|
||
// Guard: if the body was short but we haven't reached totalLength, the stream was
|
||
// truncated mid-segment (connection drop / premature close). Surface as an error so
|
||
// the scheduler is halted rather than left to drain its buffered tail into a false end.
|
||
if (segmentBytesRead < SegmentSizeBytes)
|
||
{
|
||
throw new Exception(
|
||
$"Stream truncated at byte {cursor} of {totalLength}: received {segmentBytesRead} bytes " +
|
||
$"but expected up to {SegmentSizeBytes} and have not reached EOF");
|
||
}
|
||
|
||
// Inter-segment back-pressure gate (Phase 21.2 fill signal, gating SEGMENT FETCH). Do not
|
||
// fetch the next segment while the scheduler is over high-water; wait until it drains
|
||
// below low-water. Because the browser only buffers bounded segments and we hold off
|
||
// requesting the next one, raw network memory stays at ~one segment. Shares the same
|
||
// drain helper as the per-chunk gate above. No _streamingPlaybackStarted guard is needed
|
||
// here (unlike the per-chunk gate): reaching this point means a full segment was consumed,
|
||
// which is ~24 s (WAV) / ~100 s (Opus) of audio — far past the 6-buffer playback-start
|
||
// minimum — so playback is always running by now and the fill can drain. A file that fits
|
||
// in one segment hits EOF and breaks above, never reaching this gate.
|
||
await DrainBackpressureAsync(cancellationToken);
|
||
|
||
// Fetch the next bounded segment. The end offset is clamped implicitly by the server
|
||
// (a request past EOF yields the available tail as a short slice, caught above).
|
||
var nextEnd = cursor + SegmentSizeBytes - 1;
|
||
var nextResult = await _trackMediaClient.GetTrackMedia(
|
||
trackId,
|
||
byteOffset: cursor,
|
||
byteEnd: nextEnd,
|
||
format: _currentFormat,
|
||
cancellationToken: cancellationToken);
|
||
if (!nextResult.Success || nextResult.Value == null)
|
||
{
|
||
var technicalError = nextResult.GetMessage() ?? "Failed to fetch next stream segment";
|
||
_logger.LogError("Failed to fetch segment at offset {Offset} for {TrackId}: {Error}",
|
||
cursor, trackId, technicalError);
|
||
throw new Exception(technicalError);
|
||
}
|
||
segment = nextResult.Value;
|
||
}
|
||
|
||
// Notify the JS decoder that the stream is finished. The decoder marks completion by byte
|
||
// count against the total it was initialized with; this explicit signal flushes the
|
||
// residual tail and covers the (rare) case where the total was unknown.
|
||
await _audioInterop.MarkStreamCompleteAsync(PlayerId);
|
||
|
||
// Complete-without-start fallback: if the track's total decodable audio never crossed the
|
||
// start threshold (e.g. total Opus audio < 1s lead, or WAV < 6 buffers), the in-loop
|
||
// CanStartStreaming check never fired and _streamingPlaybackStarted is still false. Now that
|
||
// streamComplete is set on the JS scheduler, calling StartStreamingPlayback lets it drain
|
||
// the accumulated buffers and fires onPlaybackEnded exactly once — same transition the
|
||
// normal path uses, so session/_sessionOpened/Duration handling is identical.
|
||
if (!_streamingPlaybackStarted)
|
||
{
|
||
await TryStartPlaybackAsync();
|
||
}
|
||
|
||
LoadProgress = 1.0;
|
||
await NotifyStateChanged();
|
||
}
|
||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||
{
|
||
// Cancellation is expected during track switch or seek — propagate cleanly.
|
||
throw;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
StreamingErrorHandler.LogError(_logger, ex, "RunSegmentedStreamAsync");
|
||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
|
||
LoadProgress = 0;
|
||
IsLoaded = false;
|
||
IsStreamingMode = false;
|
||
await NotifyStateChanged();
|
||
throw;
|
||
}
|
||
finally
|
||
{
|
||
// Release the last segment (if a fetch failed mid-loop it may still be held) and the buffer.
|
||
segment?.Dispose();
|
||
if (buffer != null)
|
||
{
|
||
ArrayPool<byte>.Shared.Return(buffer);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Call <c>StartStreamingPlayback</c> on the JS player and apply the resulting state transitions.
|
||
/// This is the single playback-start transition shared by the in-loop threshold path and the
|
||
/// completion-path fallback — both callers set the guard and apply session/Duration handling
|
||
/// identically so neither path diverges.
|
||
/// </summary>
|
||
private async Task TryStartPlaybackAsync()
|
||
{
|
||
var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId);
|
||
if (playbackResult.Success)
|
||
{
|
||
_streamingPlaybackStarted = true;
|
||
IsPlaying = true;
|
||
IsPaused = false;
|
||
IsLoaded = true; // loaded and ready, even while still downloading
|
||
ErrorMessage = null;
|
||
|
||
// Open the play session exactly once per load, at the moment playback
|
||
// truly begins (§2.1). The _sessionOpened guard keeps a seek/refill
|
||
// re-stream — which re-enters this transition with
|
||
// _streamingPlaybackStarted reset — from opening a second session for
|
||
// the same play. Duration may already be known, so re-feed it.
|
||
if (!_sessionOpened && _currentTrackId is { } trackKey)
|
||
{
|
||
_sessionOpened = true;
|
||
_playTracker?.OnPlaybackStarted(trackKey);
|
||
if (Duration is { } d)
|
||
_playTracker?.SetDuration(d);
|
||
}
|
||
|
||
await NotifyStateChanged(); // immediate — critical state change
|
||
}
|
||
else
|
||
{
|
||
var technicalError = $"Failed to start streaming playback: {playbackResult.Error}";
|
||
_logger.LogError("Failed to start playback: {Error}", technicalError);
|
||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// In streaming mode, Stop fully resets to Idle state since audio data is consumed.
|
||
/// This is equivalent to Unload for streaming playback.
|
||
/// </summary>
|
||
public override async Task Stop()
|
||
{
|
||
// In streaming mode, Stop = Unload (data is consumed, can't replay)
|
||
await ResetToIdle();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Fully resets the player to Idle state, ready for a new track.
|
||
/// </summary>
|
||
public override async Task Unload()
|
||
{
|
||
await ResetToIdle();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Override Seek to handle seek-beyond-buffer for streaming mode.
|
||
/// </summary>
|
||
public override async Task Seek(double position)
|
||
{
|
||
if (!IsLoaded || !IsStreamingMode) return;
|
||
|
||
try
|
||
{
|
||
var result = await _audioInterop.SeekAsync(PlayerId, position);
|
||
|
||
if (result.Success)
|
||
{
|
||
if (result.SeekBeyondBuffer && result.ByteOffset >= 0)
|
||
{
|
||
// Need to load new stream from offset
|
||
_logger.LogInformation("Seeking beyond buffer to {Position:F2}s, byte offset: {ByteOffset}",
|
||
position, result.ByteOffset);
|
||
await SeekBeyondBuffer(position, result.ByteOffset);
|
||
}
|
||
else
|
||
{
|
||
// Seek within buffer succeeded
|
||
CurrentTime = position;
|
||
ErrorMessage = null;
|
||
await NotifyStateChanged();
|
||
}
|
||
}
|
||
else
|
||
{
|
||
ErrorMessage = $"Seek error: {result.Error}";
|
||
await NotifyStateChanged();
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "Error seeking to position {Position}", position);
|
||
ErrorMessage = $"Error seeking: {ex.Message}";
|
||
await NotifyStateChanged();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Handle seeking beyond the currently buffered content by requesting a new stream from offset.
|
||
/// </summary>
|
||
private async Task SeekBeyondBuffer(double seekPosition, long byteOffset)
|
||
{
|
||
if (string.IsNullOrEmpty(_currentTrackId))
|
||
{
|
||
ErrorMessage = "Cannot seek - no track loaded";
|
||
return;
|
||
}
|
||
|
||
// Capture into a non-null local: _currentTrackId is the field a track-switch could clear, but
|
||
// this seek operates against the track loaded NOW; the segment loop needs a stable id.
|
||
var trackId = _currentTrackId;
|
||
|
||
IsSeekingBeyondBuffer = true;
|
||
|
||
// Cancel the current streaming loop AND wait for it to fully exit before
|
||
// starting a new one. The previous loop's pending ReadAsync will throw
|
||
// OperationCanceledException asynchronously; if we kick off a new loop
|
||
// immediately, both can race against the single-instance JS StreamDecoder
|
||
// and corrupt decode state. Draining here is the load-bearing guarantee.
|
||
//
|
||
// Invariant: any caller that supersedes a load WITHOUT wanting the load's
|
||
// state reset must assign its own CTS to _streamingCancellation *before*
|
||
// its first await. LoadTrackStreaming's OCE continuation fires during the
|
||
// drain await on the shared _activeStreamingTask; it resets IsLoaded/
|
||
// IsStreamingMode only when _streamingCancellation still equals its loadCts.
|
||
// Assigning seekCts synchronously here makes that identity check fail, so
|
||
// the seek's state survives. (ResetToIdle deliberately does NOT do this —
|
||
// it wants the reset, and nulls _streamingCancellation only after the drain.)
|
||
var oldCts = _streamingCancellation;
|
||
var seekCts = new CancellationTokenSource();
|
||
_streamingCancellation = seekCts;
|
||
oldCts?.Cancel();
|
||
await DrainActiveStreamingTaskAsync();
|
||
oldCts?.Dispose();
|
||
|
||
// Single-writer discipline (C6/AC8): all three failure exits must share the same guard.
|
||
// TrackMediaClient.GetTrackMedia swallows OperationCanceledException and returns
|
||
// Success==false, so a superseded seek lands in the media-fetch-fail branch below
|
||
// rather than in the OCE catch. Without the guard those branches would call
|
||
// RecoverFromFailedRefill — running clearForSeek + setPlaybackOffset against the player
|
||
// state the NEWER seek now owns. A local predicate keeps all three exits symmetric so a
|
||
// future exit cannot forget the check.
|
||
bool IsStillActiveSeek() => ReferenceEquals(_streamingCancellation, seekCts);
|
||
|
||
try
|
||
{
|
||
// Update UI immediately
|
||
CurrentTime = seekPosition;
|
||
await NotifyStateChanged();
|
||
|
||
// Request the FIRST bounded segment from the resolved offset (Direction B — converged with
|
||
// the forward path). Reuse the format the initial load resolved to (_currentFormat): an
|
||
// Opus seek must come back as Opus bytes so the cached setup header + page-aligned
|
||
// byteOffset (resolved JS-side from the Opus seek index) match the continuation; WAV resolves
|
||
// its offset from the header — one seam, format-appropriate math (AC9 / §3.4a C). The
|
||
// segment loop then continues forward segmentation from this offset exactly as a fresh load
|
||
// does from 0 — no forked fetch path (C1/C5).
|
||
var firstSegment = await _trackMediaClient.GetTrackMedia(
|
||
trackId,
|
||
byteOffset,
|
||
byteEnd: byteOffset + SegmentSizeBytes - 1,
|
||
format: _currentFormat,
|
||
cancellationToken: seekCts.Token);
|
||
if (!firstSegment.Success || firstSegment.Value == null)
|
||
{
|
||
var technicalError = firstSegment.GetMessage() ?? "Failed to load audio from position";
|
||
_logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError);
|
||
// Guard: a superseded seek must NOT touch shared state. The newer seek owns teardown.
|
||
if (IsStillActiveSeek())
|
||
{
|
||
await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(technicalError));
|
||
}
|
||
else
|
||
{
|
||
_logger.LogDebug("Media-fetch failed on superseded seek to {Position} — newer seek owns state, skipping recovery", seekPosition);
|
||
}
|
||
return;
|
||
}
|
||
|
||
var audio = firstSegment.Value;
|
||
// The absolute EOF boundary the segment loop's cursor targets. On a 206 the Content-Range
|
||
// carries the file total; on a 200 (single-segment file) fall back to cursor + body length.
|
||
var totalLength = audio.TotalLength ?? (byteOffset + audio.ContentLength);
|
||
|
||
// Reset streaming state for the new stream. The decoder reinit for the header-less
|
||
// continuation happens INSIDE RunSegmentedStreamAsync (seekPosition non-null), so seek and
|
||
// forward share one fetch+pump+reinit mechanism. A reinit failure there throws and lands in
|
||
// the catch below, which recovers when still the active seek — the same clean-failure path
|
||
// (AC6) the old explicit reinit branch had, now unified with the fetch-failure path.
|
||
_streamingPlaybackStarted = false;
|
||
CanStartStreaming = false;
|
||
HeaderParsed = false;
|
||
BufferedChunks = 0;
|
||
|
||
// Stream from offset via the shared segment loop. Ownership of `audio` transfers to it.
|
||
_activeStreamingTask = RunSegmentedStreamAsync(
|
||
trackId, audio, cursor: byteOffset, totalLength, seekPosition, seekCts.Token);
|
||
await _activeStreamingTask;
|
||
|
||
IsSeekingBeyondBuffer = false;
|
||
}
|
||
catch (OperationCanceledException) when (seekCts.IsCancellationRequested)
|
||
{
|
||
// Another seek or stop interrupted this one. Only clear the flag if we are
|
||
// still the active seek — if _streamingCancellation has been replaced, a
|
||
// newer seek is in progress and owns the flag.
|
||
_logger.LogDebug("Seek beyond buffer cancelled");
|
||
if (IsStillActiveSeek())
|
||
{
|
||
IsSeekingBeyondBuffer = false;
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// A refill fetch can fail deep into a long mix (the listener didn't initiate it). Recover
|
||
// into a clean paused-but-loaded state (AC6) rather than leaving the starved scheduler to
|
||
// fire a silent false end. Only when we are still the active seek — a superseding seek owns
|
||
// the state and the OCE catch above handles its own teardown.
|
||
_logger.LogError(ex, "Error during seek beyond buffer to position {Position}", seekPosition);
|
||
if (IsStillActiveSeek())
|
||
{
|
||
await RecoverFromFailedRefill(seekPosition, StreamingErrorHandler.GetUserFriendlyMessage(ex.Message));
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Clean-failure recovery for a window-miss refill (Phase 21.3 / AC6). A backward seek past the
|
||
/// retained tail re-fetches via the existing Range path; that mid-stream fetch the listener did not
|
||
/// initiate can fail deep into a long mix. When it does, the pre-seek loop has already been
|
||
/// cancelled and drained, but the JS scheduler is still holding stale pre-seek buffers and still
|
||
/// "playing" — left alone it drains them and fires a silent false end (the wedged/starved state AC6
|
||
/// forbids). This halts the scheduler into a paused-but-loaded state at <paramref name="seekPosition"/>,
|
||
/// surfaces a clear error, and leaves the track loaded so the listener can retry the seek or pick
|
||
/// another track. Mirrors <c>PlaybackScheduler.playFromPosition</c>'s end-of-buffer recovery: stop
|
||
/// pretending to play.
|
||
/// </summary>
|
||
private async Task RecoverFromFailedRefill(double seekPosition, string userFacingError)
|
||
{
|
||
// Halt the starved scheduler JS-side (stop sources, drop stale buffers, anchor at the target).
|
||
// Best-effort: if even this interop fails the player is no worse off, and we still surface the
|
||
// error and settle C# state below.
|
||
var recovered = await _audioInterop.RecoverFromFailedRefill(PlayerId, seekPosition);
|
||
if (!recovered.Success)
|
||
{
|
||
_logger.LogWarning("Refill-failure recovery interop did not succeed: {Error}", recovered.Error);
|
||
}
|
||
|
||
// Settle C# into the matching recoverable state: not playing, paused at the target, still loaded
|
||
// and still in streaming mode. IsLoaded = true and IsStreamingMode = true are both load-bearing —
|
||
// the "paused-but-loaded" contract lets the listener retry the seek (Seek early-returns when
|
||
// !IsLoaded || !IsStreamingMode), resume via TogglePlayPause, or pick another track. Resetting
|
||
// either to false would wedge at least one of the three retry routes (AC6 / Phase 21.3).
|
||
ErrorMessage = userFacingError;
|
||
IsPlaying = false;
|
||
IsPaused = true;
|
||
IsLoaded = true;
|
||
IsStreamingMode = true;
|
||
CurrentTime = seekPosition;
|
||
IsSeekingBeyondBuffer = false;
|
||
await NotifyStateChanged();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Single method to reset all state - called by both Stop and Unload, and as the prologue of a new
|
||
/// load.
|
||
/// </summary>
|
||
private async Task ResetToIdle()
|
||
{
|
||
// 0. Close any open play session BEFORE tearing down (§2.1). ResetToIdle is the single funnel
|
||
// for stop / unload / dispose / track-switch (a new LoadTrackStreaming calls it first), so a
|
||
// superseded listen is recorded here with its high-water bucket. Close is idempotent — if the
|
||
// session already closed organically or via the unload beacon, this is a no-op.
|
||
_playTracker?.Close();
|
||
_sessionOpened = false;
|
||
|
||
// 1. Cancel any ongoing streaming operation and wait for it to exit
|
||
// before tearing down JS state. Otherwise the loop's pending
|
||
// ProcessStreamingChunk call can land after StopAsync/UnloadAsync.
|
||
_streamingCancellation?.Cancel();
|
||
await DrainActiveStreamingTaskAsync();
|
||
_streamingCancellation?.Dispose();
|
||
_streamingCancellation = null;
|
||
|
||
// 2. Tell JS to stop and unload
|
||
try
|
||
{
|
||
await _audioInterop.StopAsync(PlayerId);
|
||
await _audioInterop.UnloadAsync(PlayerId);
|
||
}
|
||
catch
|
||
{
|
||
// Ignore JS errors during cleanup
|
||
}
|
||
|
||
// 3. Reset ALL state to Idle
|
||
IsPlaying = false;
|
||
IsPaused = false;
|
||
IsLoaded = false;
|
||
IsLoading = false;
|
||
CurrentTime = 0;
|
||
Duration = null;
|
||
LoadProgress = 0;
|
||
ErrorMessage = null;
|
||
CurrentTrack = null;
|
||
WaveformProfile = null;
|
||
|
||
// 4. Reset streaming-specific state
|
||
IsStreamingMode = false;
|
||
CanStartStreaming = false;
|
||
HeaderParsed = false;
|
||
BufferedChunks = 0;
|
||
_streamingPlaybackStarted = false;
|
||
IsSeekingBeyondBuffer = false;
|
||
_currentTrackId = null;
|
||
_currentFormat = AudioFormat.Lossless;
|
||
|
||
await NotifyStateChanged();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Wait for the previously-started streaming loop to fully exit. The caller
|
||
/// must have already cancelled <see cref="_streamingCancellation"/>. Swallows
|
||
/// the expected OperationCanceledException; any other exception was already
|
||
/// surfaced through the loop's own catch block, so we ignore it here too.
|
||
/// </summary>
|
||
private async Task DrainActiveStreamingTaskAsync()
|
||
{
|
||
var task = _activeStreamingTask;
|
||
if (task == null) return;
|
||
|
||
try
|
||
{
|
||
await task;
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
// Expected when we cancelled the loop ourselves.
|
||
}
|
||
catch
|
||
{
|
||
// Any other failure was already logged inside the loop.
|
||
}
|
||
finally
|
||
{
|
||
// Only clear if we are still the active task — a concurrent caller
|
||
// may have started a new stream while we were draining the old one.
|
||
if (ReferenceEquals(_activeStreamingTask, task))
|
||
{
|
||
_activeStreamingTask = null;
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Block the segment loop while the scheduler's decoded forward fill is over high-water, resuming
|
||
/// once it drains below low-water (Phase 21.2 hysteresis). Shared by the per-chunk gate (inside a
|
||
/// segment) and the inter-segment gate so both honor identical drain discipline — a guard present on
|
||
/// one path and absent on the other would let one path overshoot the memory bound.
|
||
/// <para>
|
||
/// The poll awaits on <paramref name="cancellationToken"/>, so a track switch/seek mid-wait throws
|
||
/// OCE and unwinds through the existing drain discipline (C6). UC5: a user pause freezes the playhead
|
||
/// so the fill never drains on its own — hold here until playback resumes (IsPaused clears) OR the
|
||
/// fill drains. Returns immediately when nothing is throttled (the steady-state common case).
|
||
/// </para>
|
||
/// </summary>
|
||
private async Task DrainBackpressureAsync(CancellationToken cancellationToken)
|
||
{
|
||
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
||
{
|
||
cancellationToken.ThrowIfCancellationRequested();
|
||
await Task.Delay(BackpressurePollMs, cancellationToken);
|
||
}
|
||
}
|
||
|
||
private async Task ThrottledNotifyStateChanged()
|
||
{
|
||
var now = DateTime.UtcNow;
|
||
if ((now - _lastNotification).TotalMilliseconds >= NotificationThrottleMs)
|
||
{
|
||
_lastNotification = now;
|
||
await NotifyStateChanged();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// On component unmount we must cancel the in-flight streaming loop and tear
|
||
/// down JS callbacks before the JS side's setInterval fires again with a
|
||
/// stale DotNetObjectReference. ResetToIdle covers cancellation + JS stop
|
||
/// + state reset; the base then disposes the JS player and its callbacks.
|
||
/// </summary>
|
||
public override async ValueTask DisposeAsync()
|
||
{
|
||
try
|
||
{
|
||
// ResetToIdle closes any open play session, so a dispose mid-play still records the listen.
|
||
await ResetToIdle();
|
||
}
|
||
catch
|
||
{
|
||
// Disposal must not throw; any failure here is best-effort cleanup.
|
||
}
|
||
|
||
// Detach the page-unload handler so the torn-down circuit is never invoked, then release the
|
||
// self-reference. Best-effort — the JS side tolerates an absent key.
|
||
if (_unloadKey is not null && _beacon is not null)
|
||
{
|
||
try { await _beacon.UnregisterUnloadAsync(_unloadKey); }
|
||
catch { /* best-effort */ }
|
||
}
|
||
_unloadRef?.Dispose();
|
||
_unloadRef = null;
|
||
|
||
await base.DisposeAsync();
|
||
}
|
||
|
||
private void AdaptBufferSize(int bytesRead, long readTimeMs)
|
||
{
|
||
// Adaptive buffer sizing based on network performance
|
||
if (readTimeMs > 100) // Slow read (>100ms)
|
||
{
|
||
_consecutiveSlowReads++;
|
||
if (_consecutiveSlowReads >= 3 && _currentBufferSize > MinBufferSize)
|
||
{
|
||
// Reduce buffer size for slow connections
|
||
_currentBufferSize = Math.Max(MinBufferSize, _currentBufferSize / 2);
|
||
_consecutiveSlowReads = 0;
|
||
}
|
||
}
|
||
else if (readTimeMs < 20 && bytesRead == _currentBufferSize) // Fast read, buffer fully utilized
|
||
{
|
||
_consecutiveSlowReads = 0;
|
||
if (_currentBufferSize < MaxBufferSize)
|
||
{
|
||
// Increase buffer size for fast connections
|
||
_currentBufferSize = Math.Min(MaxBufferSize, _currentBufferSize * 2);
|
||
}
|
||
}
|
||
else
|
||
{
|
||
_consecutiveSlowReads = 0;
|
||
}
|
||
}
|
||
} |