Fix truncated-segment and mid-stream failure paths in segmented loop

cursor>=totalLength is the sole forward-EOF test; a short non-final body is
a truncation error, not EOF. Mid-stream forward-load failures now invoke
RecoverFromFailedRefill so the scheduler halts instead of a silent false end.
Two regression tests pin both paths.
This commit is contained in:
daniel-c-harvey
2026-06-24 15:16:46 -04:00
parent 11faf8888f
commit e7762e35e8
2 changed files with 183 additions and 9 deletions
@@ -284,10 +284,26 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
catch (Exception ex) catch (Exception ex)
{ {
StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey); StreamingErrorHandler.LogError(_logger, ex, "LoadTrackStreaming", track.EntryKey);
ErrorMessage = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message); var userError = StreamingErrorHandler.GetUserFriendlyMessage(ex.Message);
LoadProgress = 0;
IsLoaded = false; // Mid-stream failure (playback was already underway): halt the JS scheduler into a clean
IsStreamingMode = false; // paused-but-loaded state exactly as the seek path does via RecoverFromFailedRefill, rather
// than resetting to unloaded and letting the scheduler's buffered tail drain into a silent
// false end (AC6). Apply only when this load is still the active operation — a superseding
// seek owns state and has already replaced _streamingCancellation with its own CTS.
if (_streamingPlaybackStarted && ReferenceEquals(_streamingCancellation, loadCts))
{
await RecoverFromFailedRefill(CurrentTime, userError);
}
else
{
// First-segment failure (nothing buffered / playing yet) or superseded load: the
// normal unload-to-error path is correct — no buffered tail to halt.
ErrorMessage = userError;
LoadProgress = 0;
IsLoaded = false;
IsStreamingMode = false;
}
} }
finally finally
{ {
@@ -496,6 +512,8 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
CanStartStreaming = chunkResult.CanStartStreaming; CanStartStreaming = chunkResult.CanStartStreaming;
HeaderParsed = chunkResult.HeaderParsed; HeaderParsed = chunkResult.HeaderParsed;
BufferedChunks = chunkResult.BufferCount; BufferedChunks = chunkResult.BufferCount;
// chunkResult.ProductionPaused is informational only on this path — back-pressure
// granularity is one segment (the inter-segment gate below), not per-chunk.
// Set duration from header when available (only set once) // Set duration from header when available (only set once)
if (chunkResult.Duration.HasValue && Duration == null) if (chunkResult.Duration.HasValue && Duration == null)
@@ -561,15 +579,28 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
segment.Dispose(); segment.Dispose();
segment = null!; segment = null!;
// EOF: cursor reached the total, OR the server returned a short final slice (fewer // EOF: cursor reached the total file length. This is the sole forward-EOF condition.
// bytes than the segment we requested). Either way there is nothing left to fetch. // A short segment body (segmentBytesRead < SegmentSizeBytes) is NOT an EOF signal —
// the inner read loop fully drains the HTTP body, so a short body means the server
// sent fewer bytes than the bounded range we requested. While cursor < totalLength that
// can only be a connection drop / truncated stream, NOT the file tail — route it to
// the same clean-failure recovery as a fetch error rather than silently completing.
var reachedTotal = totalLength > 0 && cursor >= totalLength; var reachedTotal = totalLength > 0 && cursor >= totalLength;
var shortSegment = segmentBytesRead < SegmentSizeBytes; if (reachedTotal)
if (reachedTotal || shortSegment)
{ {
break; break;
} }
// Guard: if the body was short but we haven't reached totalLength, the stream was
// truncated mid-segment (connection drop / premature close). Surface as an error so
// the scheduler is halted rather than left to drain its buffered tail into a false end.
if (segmentBytesRead < SegmentSizeBytes)
{
throw new Exception(
$"Stream truncated at byte {cursor} of {totalLength}: received {segmentBytesRead} bytes " +
$"but expected up to {SegmentSizeBytes} and have not reached EOF");
}
// Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH // Inter-segment back-pressure gate (Phase 21.2 fill signal, now gating SEGMENT FETCH
// instead of pacing ReadAsync on an open stream). Do not fetch the next segment while // instead of pacing ReadAsync on an open stream). Do not fetch the next segment while
// the scheduler is over high-water; wait until it drains below low-water. Because the // the scheduler is over high-water; wait until it drains below low-water. Because the
@@ -849,9 +880,12 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
} }
// Settle C# into the matching recoverable state: not playing, paused at the target, still loaded. // Settle C# into the matching recoverable state: not playing, paused at the target, still loaded.
// IsLoaded = true is load-bearing — the "paused-but-loaded" contract lets the listener retry
// the seek or pick another track; resetting to unloaded would evict the track identity.
ErrorMessage = userFacingError; ErrorMessage = userFacingError;
IsPlaying = false; IsPlaying = false;
IsPaused = true; IsPaused = true;
IsLoaded = true;
CurrentTime = seekPosition; CurrentTime = seekPosition;
IsSeekingBeyondBuffer = false; IsSeekingBeyondBuffer = false;
await NotifyStateChanged(); await NotifyStateChanged();
+141 -1
View File
@@ -64,6 +64,84 @@ public class SegmentedStreamLoopTests
} }
} }
// Serves the first segment normally, then truncates subsequent segment bodies to a short slice
// (Content-Range reports the correct total, but the HTTP body ends early — simulating a
// connection drop mid-segment while cursor < totalLength).
private sealed class TruncatingAfterFirstSegmentServer : HttpMessageHandler
{
private readonly long _total;
private readonly long _truncatedBodyBytes; // bytes to actually deliver for non-first segments
private int _audioRequestCount;
public TruncatingAfterFirstSegmentServer(long total, long truncatedBodyBytes)
{
_total = total;
_truncatedBodyBytes = truncatedBodyBytes;
}
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var path = request.RequestUri!.AbsolutePath;
if (path.EndsWith("/waveform") || path.Contains("/opus/"))
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound));
var rangeItem = request.Headers.Range!.Ranges.First();
var from = rangeItem.From ?? 0;
var to = rangeItem.To ?? (_total - 1);
if (to > _total - 1) to = _total - 1;
var requestIndex = Interlocked.Increment(ref _audioRequestCount);
// First segment (requestIndex == 1): serve fully. Subsequent segments: truncate.
var fullSliceLength = to - from + 1;
var bodyLength = requestIndex == 1 ? fullSliceLength : Math.Min(_truncatedBodyBytes, fullSliceLength);
var body = new byte[bodyLength];
var response = new HttpResponseMessage(HttpStatusCode.PartialContent)
{
Content = new ByteArrayContent(body),
};
// Content-Range always reports the true full total — the truncation is in the body, not the header.
response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total);
response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav");
return Task.FromResult(response);
}
}
// Serves the first segment normally, then returns HTTP 500 for all subsequent requests —
// simulating a mid-stream fetch failure after playback is underway.
private sealed class FailingAfterFirstSegmentServer : HttpMessageHandler
{
private readonly long _total;
private int _audioRequestCount;
public FailingAfterFirstSegmentServer(long total) => _total = total;
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var path = request.RequestUri!.AbsolutePath;
if (path.EndsWith("/waveform") || path.Contains("/opus/"))
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound));
var requestIndex = Interlocked.Increment(ref _audioRequestCount);
if (requestIndex > 1)
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.InternalServerError));
var rangeItem = request.Headers.Range!.Ranges.First();
var from = rangeItem.From ?? 0;
var to = rangeItem.To ?? (_total - 1);
if (to > _total - 1) to = _total - 1;
var body = new byte[to - from + 1];
var response = new HttpResponseMessage(HttpStatusCode.PartialContent)
{
Content = new ByteArrayContent(body),
};
response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total);
response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav");
return Task.FromResult(response);
}
}
private sealed class SingleClientFactory : IHttpClientFactory private sealed class SingleClientFactory : IHttpClientFactory
{ {
private readonly HttpMessageHandler _handler; private readonly HttpMessageHandler _handler;
@@ -97,6 +175,7 @@ public class SegmentedStreamLoopTests
public int ReinitCallCount { get; private set; } public int ReinitCallCount { get; private set; }
public int MarkCompleteCallCount { get; private set; } public int MarkCompleteCallCount { get; private set; }
public int IsProductionPausedCallCount { get; private set; } public int IsProductionPausedCallCount { get; private set; }
public int RecoverCallCount { get; private set; }
public ValueTask<TValue> InvokeAsync<TValue>(string identifier, object?[]? args) public ValueTask<TValue> InvokeAsync<TValue>(string identifier, object?[]? args)
{ {
@@ -131,6 +210,9 @@ public class SegmentedStreamLoopTests
case "DeepDrftAudio.markStreamComplete": case "DeepDrftAudio.markStreamComplete":
MarkCompleteCallCount++; MarkCompleteCallCount++;
return (ValueTask<TValue>)(object)ValueTask.FromResult(new StreamingResult { Success = true }); return (ValueTask<TValue>)(object)ValueTask.FromResult(new StreamingResult { Success = true });
case "DeepDrftAudio.recoverFromFailedRefill":
RecoverCallCount++;
return Result<TValue>(true);
default: default:
// createPlayer / setOnProgressCallback / setOnEndCallback / setVolume / // createPlayer / setOnProgressCallback / setOnEndCallback / setVolume /
// ensureAudioContextReady / initializeStreaming / startStreamingPlayback / // ensureAudioContextReady / initializeStreaming / startStreamingPlayback /
@@ -151,7 +233,7 @@ public class SegmentedStreamLoopTests
private static TrackDto Track() => new() { EntryKey = "mix-1", TrackName = "Long Mix" }; private static TrackDto Track() => new() { EntryKey = "mix-1", TrackName = "Long Mix" };
private static StreamingAudioPlayerService BuildPlayer(SegmentServer server, FakeJsRuntime js) private static StreamingAudioPlayerService BuildPlayer(HttpMessageHandler server, FakeJsRuntime js)
{ {
var interop = new AudioInteropService(js); var interop = new AudioInteropService(js);
var media = new TrackMediaClient(new SingleClientFactory(server)); var media = new TrackMediaClient(new SingleClientFactory(server));
@@ -248,6 +330,64 @@ public class SegmentedStreamLoopTests
}); });
} }
[Test]
public async Task MidStream_TruncatedSegment_RoutesToRecovery_NotSilentEof()
{
// 10 MB file → 3 segments. The second segment's body is short (1 MB instead of 4 MB) while
// cursor < totalLength — simulates a connection drop mid-segment. The loop must NOT treat this
// as EOF (must not call MarkStreamComplete) and must route to recovery (scheduler halted) so
// the buffered tail cannot drain into a silent false end.
var total = 10L * 1024 * 1024;
var truncatedBodyBytes = 1L * 1024 * 1024; // 1 MB short body for segment 2
var server = new TruncatingAfterFirstSegmentServer(total, truncatedBodyBytes);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(js.MarkCompleteCallCount, Is.Zero,
"a truncated non-final segment must NOT be reported as a clean EOF — MarkStreamComplete must not fire");
Assert.That(js.RecoverCallCount, Is.EqualTo(1),
"a truncated segment while cursor < totalLength is a failure: scheduler must be halted via recovery");
Assert.That(player.IsLoaded, Is.True,
"recovery leaves the track loaded so the listener can retry — not torn down to unloaded");
Assert.That(player.IsPaused, Is.True,
"recovery settles into a paused state, not playing");
Assert.That(player.ErrorMessage, Is.Not.Null.And.Not.Empty,
"recovery surfaces an error message to the UI");
});
}
[Test]
public async Task MidStream_SegmentFetchFailure_RoutesToRecovery_NotSilentFalseEnd()
{
// 10 MB file → 3 segments. The second segment fetch fails (HTTP 500), simulating a network
// error after playback is already underway. The loop must halt the JS scheduler via recovery
// rather than letting the buffered first-segment tail drain into a silent false end (AC6).
var total = 10L * 1024 * 1024;
var server = new FailingAfterFirstSegmentServer(total);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(js.MarkCompleteCallCount, Is.Zero,
"a mid-stream fetch failure must not report clean EOF — MarkStreamComplete must not fire");
Assert.That(js.RecoverCallCount, Is.EqualTo(1),
"a mid-stream fetch failure must halt the scheduler via recovery, not leave it to drain");
Assert.That(player.IsLoaded, Is.True,
"recovery leaves the track loaded so the listener can retry — not torn down to unloaded");
Assert.That(player.IsPaused, Is.True,
"recovery settles into a paused state, not playing");
Assert.That(player.ErrorMessage, Is.Not.Null.And.Not.Empty,
"recovery surfaces an error message to the UI");
});
}
[Test] [Test]
public async Task SeekBeyondBuffer_ReinitsOnceThenSegmentsForwardFromOffset() public async Task SeekBeyondBuffer_ReinitsOnceThenSegmentsForwardFromOffset()
{ {