refactor(split): rename DeepDrftWeb -> DeepDrftPublic and DeepDrftWeb.Client -> DeepDrftPublic.Client (Phase 4)
This commit is contained in:
@@ -0,0 +1,544 @@
|
||||
using DeepDrftModels.Entities;
|
||||
using DeepDrftPublic.Client.Clients;
|
||||
using System.Buffers;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace DeepDrftPublic.Client.Services;
|
||||
|
||||
public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerService
|
||||
{
|
||||
// Configuration constants
|
||||
private const int DefaultBufferSize = 32 * 1024; // 32KB chunks
|
||||
private const int NotificationThrottleMs = 100; // Throttle UI updates to max 10 per second
|
||||
|
||||
// Adaptive chunk sizing
|
||||
private const int MinBufferSize = 16 * 1024; // 16KB minimum
|
||||
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
|
||||
private int _currentBufferSize = DefaultBufferSize;
|
||||
private int _consecutiveSlowReads = 0;
|
||||
|
||||
|
||||
// Streaming state properties
|
||||
public bool IsStreamingMode { get; private set; } = false;
|
||||
public bool CanStartStreaming { get; private set; } = false;
|
||||
public bool HeaderParsed { get; private set; } = false;
|
||||
public int BufferedChunks { get; private set; } = 0;
|
||||
public bool IsSeekingBeyondBuffer { get; private set; } = false;
|
||||
|
||||
private bool _streamingPlaybackStarted = false;
|
||||
private CancellationTokenSource? _streamingCancellation;
|
||||
private Task? _activeStreamingTask;
|
||||
private DateTime _lastNotification = DateTime.MinValue;
|
||||
private readonly ILogger<StreamingAudioPlayerService> _logger;
|
||||
private string? _currentTrackId;
|
||||
|
||||
public StreamingAudioPlayerService(
|
||||
AudioInteropService audioInterop,
|
||||
TrackMediaClient trackMediaClient,
|
||||
ILogger<StreamingAudioPlayerService> logger)
|
||||
: base(audioInterop, trackMediaClient)
|
||||
{
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public override async Task SelectTrack(TrackEntity track)
|
||||
{
|
||||
await SelectTrackStreaming(track);
|
||||
}
|
||||
|
||||
public async Task SelectTrackStreaming(TrackEntity track)
|
||||
{
|
||||
await EnsureInitializedAsync();
|
||||
|
||||
// Resume AudioContext immediately on track selection (user interaction) to avoid clicks later
|
||||
await _audioInterop.EnsureAudioContextReady(PlayerId);
|
||||
|
||||
await NotifyTrackSelected();
|
||||
|
||||
await LoadTrackStreaming(track);
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
|
||||
private async Task LoadTrackStreaming(TrackEntity track)
|
||||
{
|
||||
// Always reset to clean state before loading new track. ResetToIdle
|
||||
// both cancels and awaits any in-flight streaming loop, so by the time
|
||||
// we return from it the previous loop is guaranteed to have exited and
|
||||
// there is no risk of interleaved ProcessStreamingChunk calls against
|
||||
// the single-instance JS StreamDecoder.
|
||||
await ResetToIdle();
|
||||
|
||||
// Save track ID for seek operations
|
||||
_currentTrackId = track.EntryKey;
|
||||
// Expose to UI immediately — Now-Playing surfaces should reflect the selected
|
||||
// track while it's still loading, not only after playback starts.
|
||||
CurrentTrack = track;
|
||||
|
||||
// Create new cancellation token for this streaming operation
|
||||
_streamingCancellation = new CancellationTokenSource();
|
||||
|
||||
try
|
||||
{
|
||||
// Set state to indicate loading has started
|
||||
ErrorMessage = null;
|
||||
LoadProgress = 0;
|
||||
IsLoading = true;
|
||||
IsStreamingMode = true;
|
||||
|
||||
// Reset adaptive buffer sizing
|
||||
_currentBufferSize = DefaultBufferSize;
|
||||
_consecutiveSlowReads = 0;
|
||||
|
||||
await NotifyStateChanged();
|
||||
|
||||
// Pass the streaming token to the HTTP layer so a navigation/track switch
|
||||
// aborts the server connection instead of leaving it draining bytes.
|
||||
var mediaResult = await _trackMediaClient.GetTrackMedia(
|
||||
track.EntryKey,
|
||||
byteOffset: 0,
|
||||
cancellationToken: _streamingCancellation.Token);
|
||||
if (!mediaResult.Success)
|
||||
{
|
||||
var technicalError = mediaResult.GetMessage();
|
||||
_logger.LogError("Failed to get track media for {TrackId}: {Error}",
|
||||
track.EntryKey, technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mediaResult.Value == null)
|
||||
{
|
||||
const string technicalError = "No audio returned from server";
|
||||
_logger.LogError("No audio data returned for track {TrackId}", track.EntryKey);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
return;
|
||||
}
|
||||
|
||||
using var audio = mediaResult.Value;
|
||||
|
||||
// Initialize streaming mode with content length
|
||||
var streamingResult = await _audioInterop.InitializeStreaming(PlayerId, audio.ContentLength);
|
||||
if (!streamingResult.Success)
|
||||
{
|
||||
var technicalError = $"Failed to initialize streaming: {streamingResult.Error}";
|
||||
_logger.LogError("Streaming initialization failed for track {TrackId}: {Error}",
|
||||
track.EntryKey, technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
return;
|
||||
}
|
||||
|
||||
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token);
|
||||
await _activeStreamingTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Cancellation is expected, reset state
|
||||
_logger.LogDebug("Audio streaming cancelled for track {TrackId}", track.EntryKey);
|
||||
IsLoaded = false;
|
||||
IsStreamingMode = false;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
|
||||
LoadProgress = 0;
|
||||
IsLoaded = false;
|
||||
IsStreamingMode = false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
IsLoading = false;
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StreamAudioWithEarlyPlayback(TrackMediaResponse audio, CancellationToken cancellationToken)
|
||||
{
|
||||
byte[]? buffer = null;
|
||||
try
|
||||
{
|
||||
long totalBytesRead = 0;
|
||||
buffer = ArrayPool<byte>.Shared.Rent(MaxBufferSize); // Rent larger buffer to accommodate adaptive sizing
|
||||
int currentBytes;
|
||||
var readTimer = System.Diagnostics.Stopwatch.StartNew();
|
||||
|
||||
do
|
||||
{
|
||||
readTimer.Restart();
|
||||
currentBytes = await audio.Stream.ReadAsync(buffer, 0, _currentBufferSize, cancellationToken);
|
||||
readTimer.Stop();
|
||||
|
||||
// Adapt buffer size based on read performance
|
||||
AdaptBufferSize(currentBytes, readTimer.ElapsedMilliseconds);
|
||||
|
||||
if (currentBytes > 0)
|
||||
{
|
||||
totalBytesRead += currentBytes;
|
||||
|
||||
// Always slice to the exact number of bytes read. The pooled buffer
|
||||
// is rented at MaxBufferSize and may carry stale bytes past
|
||||
// currentBytes from a prior rental — handing the full array to JS
|
||||
// interop would serialise that garbage into the audio stream.
|
||||
var actualBuffer = buffer.AsSpan(0, currentBytes).ToArray();
|
||||
|
||||
// Process chunk for streaming
|
||||
var chunkResult = await _audioInterop.ProcessStreamingChunk(PlayerId, actualBuffer);
|
||||
if (!chunkResult.Success)
|
||||
{
|
||||
var error = $"Failed to process streaming chunk: {chunkResult.Error}";
|
||||
_logger.LogWarning("Chunk processing failed: {Error}", error);
|
||||
throw new Exception(error);
|
||||
}
|
||||
|
||||
// Update streaming state
|
||||
CanStartStreaming = chunkResult.CanStartStreaming;
|
||||
HeaderParsed = chunkResult.HeaderParsed;
|
||||
BufferedChunks = chunkResult.BufferCount;
|
||||
|
||||
// Set duration from WAV header when available (only set once)
|
||||
if (chunkResult.Duration.HasValue && Duration == null)
|
||||
{
|
||||
Duration = chunkResult.Duration.Value;
|
||||
_logger.LogInformation("Duration set from WAV header: {Duration:F2} seconds", Duration);
|
||||
}
|
||||
|
||||
// Start playback as soon as we can
|
||||
if (!_streamingPlaybackStarted && CanStartStreaming)
|
||||
{
|
||||
var playbackResult = await _audioInterop.StartStreamingPlayback(PlayerId);
|
||||
if (playbackResult.Success)
|
||||
{
|
||||
_streamingPlaybackStarted = true;
|
||||
IsPlaying = true;
|
||||
IsPaused = false;
|
||||
IsLoaded = true; // Track is loaded and ready to play (even if still downloading)
|
||||
ErrorMessage = null;
|
||||
await NotifyStateChanged(); // Immediate notification for critical state change
|
||||
}
|
||||
else
|
||||
{
|
||||
var technicalError = $"Failed to start streaming playback: {playbackResult.Error}";
|
||||
_logger.LogError("Failed to start playback: {Error}", technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
}
|
||||
}
|
||||
|
||||
// Update progress
|
||||
if (audio.ContentLength > 0)
|
||||
{
|
||||
LoadProgress = Math.Min(1.0, (double)totalBytesRead / audio.ContentLength);
|
||||
}
|
||||
|
||||
await ThrottledNotifyStateChanged();
|
||||
}
|
||||
} while (currentBytes > 0);
|
||||
|
||||
// Notify the JS decoder that the stream is finished. When the server omits
|
||||
// Content-Length the StreamDecoder cannot determine completion via byte counting
|
||||
// alone; this explicit signal ensures the tail-decoding path (streamComplete=true)
|
||||
// fires regardless of whether Content-Length was present.
|
||||
await _audioInterop.MarkStreamCompleteAsync(PlayerId);
|
||||
|
||||
// Mark as fully loaded
|
||||
LoadProgress = 1.0;
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
StreamingErrorHandler.LogError(_logger, ex, "StreamAudioWithEarlyPlayback");
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
|
||||
LoadProgress = 0;
|
||||
IsLoaded = false;
|
||||
IsStreamingMode = false;
|
||||
await NotifyStateChanged();
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (buffer != null)
|
||||
{
|
||||
ArrayPool<byte>.Shared.Return(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// In streaming mode, Stop fully resets to Idle state since audio data is consumed.
|
||||
/// This is equivalent to Unload for streaming playback.
|
||||
/// </summary>
|
||||
public override async Task Stop()
|
||||
{
|
||||
// In streaming mode, Stop = Unload (data is consumed, can't replay)
|
||||
await ResetToIdle();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fully resets the player to Idle state, ready for a new track.
|
||||
/// </summary>
|
||||
public override async Task Unload()
|
||||
{
|
||||
await ResetToIdle();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Override Seek to handle seek-beyond-buffer for streaming mode.
|
||||
/// </summary>
|
||||
public override async Task Seek(double position)
|
||||
{
|
||||
if (!IsLoaded || !IsStreamingMode) return;
|
||||
|
||||
try
|
||||
{
|
||||
var result = await _audioInterop.SeekAsync(PlayerId, position);
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
if (result.SeekBeyondBuffer && result.ByteOffset > 0)
|
||||
{
|
||||
// Need to load new stream from offset
|
||||
_logger.LogInformation("Seeking beyond buffer to {Position:F2}s, byte offset: {ByteOffset}",
|
||||
position, result.ByteOffset);
|
||||
await SeekBeyondBuffer(position, result.ByteOffset);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Seek within buffer succeeded
|
||||
CurrentTime = position;
|
||||
ErrorMessage = null;
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ErrorMessage = $"Seek error: {result.Error}";
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error seeking to position {Position}", position);
|
||||
ErrorMessage = $"Error seeking: {ex.Message}";
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handle seeking beyond the currently buffered content by requesting a new stream from offset.
|
||||
/// </summary>
|
||||
private async Task SeekBeyondBuffer(double seekPosition, long byteOffset)
|
||||
{
|
||||
if (string.IsNullOrEmpty(_currentTrackId))
|
||||
{
|
||||
ErrorMessage = "Cannot seek - no track loaded";
|
||||
return;
|
||||
}
|
||||
|
||||
IsSeekingBeyondBuffer = true;
|
||||
|
||||
// Cancel the current streaming loop AND wait for it to fully exit before
|
||||
// starting a new one. The previous loop's pending ReadAsync will throw
|
||||
// OperationCanceledException asynchronously; if we kick off a new loop
|
||||
// immediately, both can race against the single-instance JS StreamDecoder
|
||||
// and corrupt decode state. Draining here is the load-bearing guarantee.
|
||||
_streamingCancellation?.Cancel();
|
||||
await DrainActiveStreamingTaskAsync();
|
||||
_streamingCancellation?.Dispose();
|
||||
_streamingCancellation = new CancellationTokenSource();
|
||||
|
||||
try
|
||||
{
|
||||
// Update UI immediately
|
||||
CurrentTime = seekPosition;
|
||||
await NotifyStateChanged();
|
||||
|
||||
// Request new stream from offset
|
||||
var mediaResult = await _trackMediaClient.GetTrackMedia(
|
||||
_currentTrackId,
|
||||
byteOffset,
|
||||
cancellationToken: _streamingCancellation.Token);
|
||||
if (!mediaResult.Success || mediaResult.Value == null)
|
||||
{
|
||||
var technicalError = mediaResult.GetMessage() ?? "Failed to load audio from position";
|
||||
_logger.LogError("Failed to get track media from offset {Offset}: {Error}", byteOffset, technicalError);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(technicalError);
|
||||
IsSeekingBeyondBuffer = false;
|
||||
return;
|
||||
}
|
||||
|
||||
using var audio = mediaResult.Value;
|
||||
|
||||
// Reinitialize JS player for offset streaming
|
||||
var reinitResult = await _audioInterop.ReinitializeFromOffset(PlayerId, audio.ContentLength, seekPosition);
|
||||
if (!reinitResult.Success)
|
||||
{
|
||||
_logger.LogError("Failed to reinitialize for offset streaming: {Error}", reinitResult.Error);
|
||||
ErrorMessage = "Failed to seek to position";
|
||||
IsSeekingBeyondBuffer = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset streaming state for new stream
|
||||
_streamingPlaybackStarted = false;
|
||||
CanStartStreaming = false;
|
||||
HeaderParsed = false;
|
||||
BufferedChunks = 0;
|
||||
|
||||
// Stream audio from offset
|
||||
_activeStreamingTask = StreamAudioWithEarlyPlayback(audio, _streamingCancellation.Token);
|
||||
await _activeStreamingTask;
|
||||
|
||||
IsSeekingBeyondBuffer = false;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Another seek or stop interrupted this one
|
||||
_logger.LogDebug("Seek beyond buffer cancelled");
|
||||
IsSeekingBeyondBuffer = false;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error during seek beyond buffer to position {Position}", seekPosition);
|
||||
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
|
||||
IsSeekingBeyondBuffer = false;
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Single method to reset all state - called by both Stop and Unload.
|
||||
/// </summary>
|
||||
private async Task ResetToIdle()
|
||||
{
|
||||
// 1. Cancel any ongoing streaming operation and wait for it to exit
|
||||
// before tearing down JS state. Otherwise the loop's pending
|
||||
// ProcessStreamingChunk call can land after StopAsync/UnloadAsync.
|
||||
_streamingCancellation?.Cancel();
|
||||
await DrainActiveStreamingTaskAsync();
|
||||
_streamingCancellation?.Dispose();
|
||||
_streamingCancellation = null;
|
||||
|
||||
// 2. Tell JS to stop and unload
|
||||
try
|
||||
{
|
||||
await _audioInterop.StopAsync(PlayerId);
|
||||
await _audioInterop.UnloadAsync(PlayerId);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Ignore JS errors during cleanup
|
||||
}
|
||||
|
||||
// 3. Reset ALL state to Idle
|
||||
IsPlaying = false;
|
||||
IsPaused = false;
|
||||
IsLoaded = false;
|
||||
IsLoading = false;
|
||||
CurrentTime = 0;
|
||||
Duration = null;
|
||||
LoadProgress = 0;
|
||||
ErrorMessage = null;
|
||||
CurrentTrack = null;
|
||||
|
||||
// 4. Reset streaming-specific state
|
||||
IsStreamingMode = false;
|
||||
CanStartStreaming = false;
|
||||
HeaderParsed = false;
|
||||
BufferedChunks = 0;
|
||||
_streamingPlaybackStarted = false;
|
||||
IsSeekingBeyondBuffer = false;
|
||||
_currentTrackId = null;
|
||||
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Wait for the previously-started streaming loop to fully exit. The caller
|
||||
/// must have already cancelled <see cref="_streamingCancellation"/>. Swallows
|
||||
/// the expected OperationCanceledException; any other exception was already
|
||||
/// surfaced through the loop's own catch block, so we ignore it here too.
|
||||
/// </summary>
|
||||
private async Task DrainActiveStreamingTaskAsync()
|
||||
{
|
||||
var task = _activeStreamingTask;
|
||||
if (task == null) return;
|
||||
|
||||
try
|
||||
{
|
||||
await task;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected when we cancelled the loop ourselves.
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Any other failure was already logged inside the loop.
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Only clear if we are still the active task — a concurrent caller
|
||||
// may have started a new stream while we were draining the old one.
|
||||
if (ReferenceEquals(_activeStreamingTask, task))
|
||||
{
|
||||
_activeStreamingTask = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ThrottledNotifyStateChanged()
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
if ((now - _lastNotification).TotalMilliseconds >= NotificationThrottleMs)
|
||||
{
|
||||
_lastNotification = now;
|
||||
await NotifyStateChanged();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// On component unmount we must cancel the in-flight streaming loop and tear
|
||||
/// down JS callbacks before the JS side's setInterval fires again with a
|
||||
/// stale DotNetObjectReference. ResetToIdle covers cancellation + JS stop
|
||||
/// + state reset; the base then disposes the JS player and its callbacks.
|
||||
/// </summary>
|
||||
public override async ValueTask DisposeAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
await ResetToIdle();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Disposal must not throw; any failure here is best-effort cleanup.
|
||||
}
|
||||
await base.DisposeAsync();
|
||||
}
|
||||
|
||||
private void AdaptBufferSize(int bytesRead, long readTimeMs)
|
||||
{
|
||||
// Adaptive buffer sizing based on network performance
|
||||
if (readTimeMs > 100) // Slow read (>100ms)
|
||||
{
|
||||
_consecutiveSlowReads++;
|
||||
if (_consecutiveSlowReads >= 3 && _currentBufferSize > MinBufferSize)
|
||||
{
|
||||
// Reduce buffer size for slow connections
|
||||
_currentBufferSize = Math.Max(MinBufferSize, _currentBufferSize / 2);
|
||||
_consecutiveSlowReads = 0;
|
||||
}
|
||||
}
|
||||
else if (readTimeMs < 20 && bytesRead == _currentBufferSize) // Fast read, buffer fully utilized
|
||||
{
|
||||
_consecutiveSlowReads = 0;
|
||||
if (_currentBufferSize < MaxBufferSize)
|
||||
{
|
||||
// Increase buffer size for fast connections
|
||||
_currentBufferSize = Math.Min(MaxBufferSize, _currentBufferSize * 2);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
_consecutiveSlowReads = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user