using DeepDrftModels.DTOs; using DeepDrftPublic.Client.Clients; using System.Buffers; using Microsoft.Extensions.Logging; namespace DeepDrftPublic.Client.Services; public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerService { // Configuration constants private const int DefaultBufferSize = 32 * 1024; // 32KB chunks private const int NotificationThrottleMs = 100; // Throttle UI updates to max 10 per second // Adaptive chunk sizing private const int MinBufferSize = 16 * 1024; // 16KB minimum private const int MaxBufferSize = 64 * 1024; // 64KB maximum private int _currentBufferSize = DefaultBufferSize; private int _consecutiveSlowReads = 0; // Streaming state properties public bool IsStreamingMode { get; private set; } = false; public bool CanStartStreaming { get; private set; } = false; public bool HeaderParsed { get; private set; } = false; public int BufferedChunks { get; private set; } = 0; public bool IsSeekingBeyondBuffer { get; private set; } = false; private bool _streamingPlaybackStarted = false; private CancellationTokenSource? _streamingCancellation; private Task? _activeStreamingTask; private DateTime _lastNotification = DateTime.MinValue; private readonly ILogger _logger; private string? _currentTrackId; public StreamingAudioPlayerService( AudioInteropService audioInterop, TrackMediaClient trackMediaClient, ILogger logger) : base(audioInterop, trackMediaClient) { _logger = logger; } public override async Task SelectTrack(TrackDto track) { await SelectTrackStreaming(track); } public async Task SelectTrackStreaming(TrackDto track) { await EnsureInitializedAsync(); // Resume AudioContext immediately on track selection (user interaction) to avoid clicks later await _audioInterop.EnsureAudioContextReady(PlayerId); await NotifyTrackSelected(); await LoadTrackStreaming(track); await NotifyStateChanged(); } private async Task LoadTrackStreaming(TrackDto track) { // Always reset to clean state before loading new track. ResetToIdle // both cancels and awaits any in-flight streaming loop, so by the time // we return from it the previous loop is guaranteed to have exited and // there is no risk of interleaved ProcessStreamingChunk calls against // the single-instance JS StreamDecoder. await ResetToIdle(); // Save track ID for seek operations _currentTrackId = track.EntryKey; // Expose to UI immediately — Now-Playing surfaces should reflect the selected // track while it's still loading, not only after playback starts. CurrentTrack = track; // Create new cancellation token for this streaming operation _streamingCancellation = new CancellationTokenSource(); try { // Set state to indicate loading has started ErrorMessage = null; LoadProgress = 0; IsLoading = true; IsStreamingMode = true; // Reset adaptive buffer sizing _currentBufferSize = DefaultBufferSize; _consecutiveSlowReads = 0; await NotifyStateChanged(); // Pass the streaming token to the HTTP layer so a navigation/track switch // aborts the server connection instead of leaving it draining bytes. var mediaResult = await _trackMediaClient.GetTrackMedia( track.EntryKey, byteOffset: 0, cancellationToken: _streamingCancellation.Token); if (!mediaResult.Success) { var technicalError = mediaResult.GetMessage(); _logger.LogError("Failed to get track media for {TrackId}: {Error}", track.EntryKey, technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } if (mediaResult.Value == null) { const string technicalError = "No audio returned from server"; _logger.LogError("No audio data returned for track {TrackId}", track.EntryKey); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } using var audio = mediaResult.Value; // Initialize streaming mode with content length var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, audio.ContentLength); if (!streamingResult.Success) { var technicalError = $"Failed to initialize streaming: {streamingResult.Error}"; _logger.LogError("Streaming initialization failed for track {TrackId}: {Error}", track.EntryKey, technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); return; } _activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token); await _activeStreamingTask; } catch (OperationCanceledException) { // Cancellation is expected, reset state _logger.LogDebug("Audio streaming cancelled for track {TrackId}", track.EntryKey); IsLoaded = false; IsStreamingMode = false; } catch (Exception ex) { StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); LoadProgress = 0; IsLoaded = false; IsStreamingMode = false; } finally { IsLoading = false; await NotifyStateChanged(); } } private async Task StreamAudioWithEarlyPlayback(TrackMediaResponse audio, CancellationToken cancellationToken) { byte[]? buffer = null; try { long totalBytesRead = 0; buffer = ArrayPool.Shared.Rent(MaxBufferSize); // Rent larger buffer to accommodate adaptive sizing int currentBytes; var readTimer = System.Diagnostics.Stopwatch.StartNew(); do { readTimer.Restart(); currentBytes = await audio.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken); readTimer.Stop(); // Adapt buffer size based on read performance AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds); if (currentBytes > 0) { totalBytesRead += currentBytes; // Always slice to the exact number of bytes read. The pooled buffer // is rented at MaxBufferSize and may carry stale bytes past // currentBytes from a prior rental — handing the full array to JS // interop would serialise that garbage into the audio stream. var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray(); // Process chunk for streaming var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer); if (!chunkResult.Success) { var error = $"Failed to process streaming chunk: {chunkResult.Error}"; _logger.LogWarning("Chunk processing failed: {Error}", error); throw new Exception(error); } // Update streaming state CanStartStreaming = chunkResult.CanStartStreaming; HeaderParsed = chunkResult.HeaderParsed; BufferedChunks = chunkResult.BufferCount; // Set duration from WAV header when available (only set once) if (chunkResult.Duration.HasValue && Duration == null) { Duration = chunkResult.Duration.Value; _logger.LogInformation("Duration set from WAV header: {Duration:F2} seconds", Duration); } // Start playback as soon as we can if (!_streamingPlaybackStarted && CanStartStreaming) { var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId); if (playbackResult.Success) { _streamingPlaybackStarted = true; IsPlaying = true; IsPaused = false; IsLoaded = true; // Track is loaded and ready to play (even if still downloading) ErrorMessage = null; await NotifyStateChanged(); // Immediate notification for critical state change } else { var technicalError = $"Failed to start streaming playback: {playbackResult.Error}"; _logger.LogError("Failed to start playback: {Error}", technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); } } // Update progress if (audio.ContentLength > 0) { LoadProgress = Math.Min(1.0, (double)totalBytesRead / audio.ContentLength); } await ThrottledNotifyStateChanged(); } } while (currentBytes > 0); // Notify the JS decoder that the stream is finished. When the server omits // Content-Length the StreamDecoder cannot determine completion via byte counting // alone; this explicit signal ensures the tail-decoding path (streamComplete=true) // fires regardless of whether Content-Length was present. await _audioInterop.MarkStreamCompleteAsync(PlayerId); // Mark as fully loaded LoadProgress = 1.0; await NotifyStateChanged(); } catch (Exception ex) { StreamingErrorHandler.LogError(_logger, ex, "StreamAudioWithEarlyPlayback"); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); LoadProgress = 0; IsLoaded = false; IsStreamingMode = false; await NotifyStateChanged(); throw; } finally { if (buffer != null) { ArrayPool.Shared.Return(buffer); } } } /// /// In streaming mode, Stop fully resets to Idle state since audio data is consumed. /// This is equivalent to Unload for streaming playback. /// public override async Task Stop() { // In streaming mode, Stop = Unload (data is consumed, can't replay) await ResetToIdle(); } /// /// Fully resets the player to Idle state, ready for a new track. /// public override async Task Unload() { await ResetToIdle(); } /// /// Override Seek to handle seek-beyond-buffer for streaming mode. /// public override async Task Seek(double position) { if (!IsLoaded || !IsStreamingMode) return; try { var result = await _audioInterop.SeekAsync(PlayerId, position); if (result.Success) { if (result.SeekBeyondBuffer && result.ByteOffset > 0) { // Need to load new stream from offset _logger.LogInformation("Seeking beyond buffer to {Position:F2}s, byte offset: {ByteOffset}", position, result.ByteOffset); await SeekBeyondBuffer(position, result.ByteOffset); } else { // Seek within buffer succeeded CurrentTime = position; ErrorMessage = null; await NotifyStateChanged(); } } else { ErrorMessage = $"Seek error: {result.Error}"; await NotifyStateChanged(); } } catch (Exception ex) { _logger.LogError(ex, "Error seeking to position {Position}", position); ErrorMessage = $"Error seeking: {ex.Message}"; await NotifyStateChanged(); } } /// /// Handle seeking beyond the currently buffered content by requesting a new stream from offset. /// private async Task SeekBeyondBuffer(double seekPosition, long byteOffset) { if (string.IsNullOrEmpty(_currentTrackId)) { ErrorMessage = "Cannot seek - no track loaded"; return; } IsSeekingBeyondBuffer = true; // Cancel the current streaming loop AND wait for it to fully exit before // starting a new one. The previous loop's pending ReadAsync will throw // OperationCanceledException asynchronously; if we kick off a new loop // immediately, both can race against the single-instance JS StreamDecoder // and corrupt decode state. Draining here is the load-bearing guarantee. _streamingCancellation?.Cancel(); await DrainActiveStreamingTaskAsync(); _streamingCancellation?.Dispose(); _streamingCancellation = new CancellationTokenSource(); try { // Update UI immediately CurrentTime = seekPosition; await NotifyStateChanged(); // Request new stream from offset var mediaResult = await _trackMediaClient.GetTrackMedia( _currentTrackId, byteOffset, cancellationToken: _streamingCancellation.Token); if (!mediaResult.Success || mediaResult.Value == null) { var technicalError = mediaResult.GetMessage() ?? "Failed to load audio from position"; _logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError); IsSeekingBeyondBuffer = false; return; } using var audio = mediaResult.Value; // Reinitialize JS player for offset streaming var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, audio.ContentLength, seekPosition); if (!reinitResult.Success) { _logger.LogError("Failed to reinitialize for offset streaming: {Error}", reinitResult.Error); ErrorMessage = "Failed to seek to position"; IsSeekingBeyondBuffer = false; return; } // Reset streaming state for new stream _streamingPlaybackStarted = false; CanStartStreaming = false; HeaderParsed = false; BufferedChunks = 0; // Stream audio from offset _activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token); await _activeStreamingTask; IsSeekingBeyondBuffer = false; } catch (OperationCanceledException) { // Another seek or stop interrupted this one _logger.LogDebug("Seek beyond buffer cancelled"); IsSeekingBeyondBuffer = false; } catch (Exception ex) { _logger.LogError(ex, "Error during seek beyond buffer to position {Position}", seekPosition); ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); IsSeekingBeyondBuffer = false; await NotifyStateChanged(); } } /// /// Single method to reset all state - called by both Stop and Unload. /// private async Task ResetToIdle() { // 1. Cancel any ongoing streaming operation and wait for it to exit // before tearing down JS state. Otherwise the loop's pending // ProcessStreamingChunk call can land after StopAsync/UnloadAsync. _streamingCancellation?.Cancel(); await DrainActiveStreamingTaskAsync(); _streamingCancellation?.Dispose(); _streamingCancellation = null; // 2. Tell JS to stop and unload try { await _audioInterop.StopAsync(PlayerId); await _audioInterop.UnloadAsync(PlayerId); } catch { // Ignore JS errors during cleanup } // 3. Reset ALL state to Idle IsPlaying = false; IsPaused = false; IsLoaded = false; IsLoading = false; CurrentTime = 0; Duration = null; LoadProgress = 0; ErrorMessage = null; CurrentTrack = null; // 4. Reset streaming-specific state IsStreamingMode = false; CanStartStreaming = false; HeaderParsed = false; BufferedChunks = 0; _streamingPlaybackStarted = false; IsSeekingBeyondBuffer = false; _currentTrackId = null; await NotifyStateChanged(); } /// /// Wait for the previously-started streaming loop to fully exit. The caller /// must have already cancelled . Swallows /// the expected OperationCanceledException; any other exception was already /// surfaced through the loop's own catch block, so we ignore it here too. /// private async Task DrainActiveStreamingTaskAsync() { var task = _activeStreamingTask; if (task == null) return; try { await task; } catch (OperationCanceledException) { // Expected when we cancelled the loop ourselves. } catch { // Any other failure was already logged inside the loop. } finally { // Only clear if we are still the active task — a concurrent caller // may have started a new stream while we were draining the old one. if (ReferenceEquals(_activeStreamingTask, task)) { _activeStreamingTask = null; } } } private async Task ThrottledNotifyStateChanged() { var now = DateTime.UtcNow; if ((now - _lastNotification).TotalMilliseconds >= NotificationThrottleMs) { _lastNotification = now; await NotifyStateChanged(); } } /// /// On component unmount we must cancel the in-flight streaming loop and tear /// down JS callbacks before the JS side's setInterval fires again with a /// stale DotNetObjectReference. ResetToIdle covers cancellation + JS stop /// + state reset; the base then disposes the JS player and its callbacks. /// public override async ValueTask DisposeAsync() { try { await ResetToIdle(); } catch { // Disposal must not throw; any failure here is best-effort cleanup. } await base.DisposeAsync(); } private void AdaptBufferSize(int bytesRead, long readTimeMs) { // Adaptive buffer sizing based on network performance if (readTimeMs > 100) // Slow read (>100ms) { _consecutiveSlowReads++; if (_consecutiveSlowReads >= 3 && _currentBufferSize > MinBufferSize) { // Reduce buffer size for slow connections _currentBufferSize = Math.Max(MinBufferSize, _currentBufferSize / 2); _consecutiveSlowReads = 0; } } else if (readTimeMs < 20 && bytesRead == _currentBufferSize) // Fast read, buffer fully utilized { _consecutiveSlowReads = 0; if (_currentBufferSize < MaxBufferSize) { // Increase buffer size for fast connections _currentBufferSize = Math.Min(MaxBufferSize, _currentBufferSize * 2); } } else { _consecutiveSlowReads = 0; } } }