Files
deepdrft/DeepDrftTests/SegmentedStreamLoopTests.cs
T
daniel-c-harvey cc9d20184d Restore IsStreamingMode on recovery; guard superseded-load else-branch
RecoverFromFailedRefill now sets IsStreamingMode=true so the in-place
seek-retry route isn't wedged. The generic-catch unload path is gated on
the loadCts identity, so a superseded load no longer clobbers a newer
operation's state.
2026-06-24 15:37:38 -04:00

428 lines
21 KiB
C#

using System.Collections.Concurrent;
using System.Net;
using System.Net.Http.Headers;
using DeepDrftModels.DTOs;
using DeepDrftPublic.Client.Clients;
using DeepDrftPublic.Client.Services;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.JSInterop;
namespace DeepDrftTests;
/// <summary>
/// Phase 21 Direction B — the segmented forward read loop in <see cref="StreamingAudioPlayerService"/>.
/// Drives a real <c>SelectTrackStreaming</c> against a fake JS runtime and a scripted HTTP handler that
/// serves bounded 206 segments, then asserts the loop's contract:
/// <list type="bullet">
/// <item>forward playback fetches in bounded <c>bytes=start-end</c> segments (the network-memory bound);</item>
/// <item>the cursor advances contiguously across segment boundaries until the file total is reached (EOF);</item>
/// <item>the next segment is NOT fetched while the scheduler reports production paused (the fill gate);</item>
/// <item>a seek converges onto the SAME segment loop — reinit then continued segmentation, no forked path.</item>
/// </list>
/// True network/browser memory behaviour is Daniel's manual re-run; this pins the request sequencing and
/// gating the harness can observe.
/// </summary>
[TestFixture]
public class SegmentedStreamLoopTests
{
private const long SegmentSize = 4 * 1024 * 1024;
// Records every audio Range request and serves a bounded 206 slice. Audio bodies are zero-filled —
// the fake JS decoder does not inspect bytes; it scripts canStart/productionPaused directly.
private sealed class SegmentServer : HttpMessageHandler
{
private readonly long _total;
public List<(long From, long? To)> AudioRanges { get; } = new();
public SegmentServer(long total) => _total = total;
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var path = request.RequestUri!.AbsolutePath;
// Waveform profile + sidecar fetches are best-effort side calls — 404 them so the load
// path falls back cleanly and the test stays focused on the audio segment loop.
if (path.EndsWith("/waveform") || path.Contains("/opus/"))
{
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound));
}
var rangeItem = request.Headers.Range!.Ranges.First();
var from = rangeItem.From ?? 0;
var to = rangeItem.To ?? (_total - 1);
if (to > _total - 1) to = _total - 1;
lock (AudioRanges) AudioRanges.Add((rangeItem.From ?? 0, rangeItem.To));
var body = new byte[Math.Max(0, to - from + 1)];
var response = new HttpResponseMessage(HttpStatusCode.PartialContent)
{
Content = new ByteArrayContent(body),
};
response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total);
response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav");
return Task.FromResult(response);
}
}
// Serves the first segment normally, then truncates subsequent segment bodies to a short slice
// (Content-Range reports the correct total, but the HTTP body ends early — simulating a
// connection drop mid-segment while cursor < totalLength).
private sealed class TruncatingAfterFirstSegmentServer : HttpMessageHandler
{
private readonly long _total;
private readonly long _truncatedBodyBytes; // bytes to actually deliver for non-first segments
private int _audioRequestCount;
public TruncatingAfterFirstSegmentServer(long total, long truncatedBodyBytes)
{
_total = total;
_truncatedBodyBytes = truncatedBodyBytes;
}
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var path = request.RequestUri!.AbsolutePath;
if (path.EndsWith("/waveform") || path.Contains("/opus/"))
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound));
var rangeItem = request.Headers.Range!.Ranges.First();
var from = rangeItem.From ?? 0;
var to = rangeItem.To ?? (_total - 1);
if (to > _total - 1) to = _total - 1;
var requestIndex = Interlocked.Increment(ref _audioRequestCount);
// First segment (requestIndex == 1): serve fully. Subsequent segments: truncate.
var fullSliceLength = to - from + 1;
var bodyLength = requestIndex == 1 ? fullSliceLength : Math.Min(_truncatedBodyBytes, fullSliceLength);
var body = new byte[bodyLength];
var response = new HttpResponseMessage(HttpStatusCode.PartialContent)
{
Content = new ByteArrayContent(body),
};
// Content-Range always reports the true full total — the truncation is in the body, not the header.
response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total);
response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav");
return Task.FromResult(response);
}
}
// Serves the first segment normally, then returns HTTP 500 for all subsequent requests —
// simulating a mid-stream fetch failure after playback is underway.
private sealed class FailingAfterFirstSegmentServer : HttpMessageHandler
{
private readonly long _total;
private int _audioRequestCount;
public FailingAfterFirstSegmentServer(long total) => _total = total;
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var path = request.RequestUri!.AbsolutePath;
if (path.EndsWith("/waveform") || path.Contains("/opus/"))
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.NotFound));
var requestIndex = Interlocked.Increment(ref _audioRequestCount);
if (requestIndex > 1)
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.InternalServerError));
var rangeItem = request.Headers.Range!.Ranges.First();
var from = rangeItem.From ?? 0;
var to = rangeItem.To ?? (_total - 1);
if (to > _total - 1) to = _total - 1;
var body = new byte[to - from + 1];
var response = new HttpResponseMessage(HttpStatusCode.PartialContent)
{
Content = new ByteArrayContent(body),
};
response.Content.Headers.ContentRange = new ContentRangeHeaderValue(from, to, _total);
response.Content.Headers.ContentType = new MediaTypeHeaderValue("audio/wav");
return Task.FromResult(response);
}
}
private sealed class SingleClientFactory : IHttpClientFactory
{
private readonly HttpMessageHandler _handler;
public SingleClientFactory(HttpMessageHandler handler) => _handler = handler;
public HttpClient CreateClient(string name) =>
new(_handler, disposeHandler: false) { BaseAddress = new Uri("https://content.test/") };
}
/// <summary>
/// Scriptable JS runtime. processStreamingChunk reports canStart=true immediately (so playback
/// starts on the first chunk) and a productionPaused value pulled from a queue the test controls;
/// isProductionPaused (the inter-segment poll) reads a separate queue so a test can hold the gate
/// closed for N polls then release it. Records reinitializeFromOffset / markStreamComplete calls.
/// </summary>
private sealed class FakeJsRuntime : IJSRuntime
{
private readonly Func<bool> _chunkProductionPaused;
private readonly Func<bool> _isProductionPaused;
private readonly long? _seekByteOffset;
public FakeJsRuntime(
Func<bool>? chunkProductionPaused = null,
Func<bool>? isProductionPaused = null,
long? seekByteOffset = null)
{
_chunkProductionPaused = chunkProductionPaused ?? (() => false);
_isProductionPaused = isProductionPaused ?? (() => false);
_seekByteOffset = seekByteOffset;
}
public int ReinitCallCount { get; private set; }
public int MarkCompleteCallCount { get; private set; }
public int IsProductionPausedCallCount { get; private set; }
public int RecoverCallCount { get; private set; }
public ValueTask<TValue> InvokeAsync<TValue>(string identifier, object?[]? args)
{
switch (identifier)
{
case "DeepDrftAudio.isReady":
return Ok<TValue>(true);
case "DeepDrftAudio.canDecodeOggOpus":
return Ok<TValue>(false); // force the lossless path — no sidecar dance
case "DeepDrftAudio.isProductionPaused":
IsProductionPausedCallCount++;
return Ok<TValue>(_isProductionPaused());
case "DeepDrftAudio.processStreamingChunk":
return (ValueTask<TValue>)(object)ValueTask.FromResult(new StreamingResult
{
Success = true,
CanStartStreaming = true,
HeaderParsed = true,
BufferCount = 8,
Duration = 600,
ProductionPaused = _chunkProductionPaused(),
});
case "DeepDrftAudio.seek":
// When a seek offset is scripted, report seek-beyond-buffer so Seek() routes into
// SeekBeyondBuffer → the shared segment loop with a continuation reinit.
return (ValueTask<TValue>)(object)ValueTask.FromResult(_seekByteOffset is { } off
? new SeekResult { Success = true, SeekBeyondBuffer = true, ByteOffset = off }
: new SeekResult { Success = true });
case "DeepDrftAudio.reinitializeFromOffset":
ReinitCallCount++;
return Result<TValue>(true);
case "DeepDrftAudio.markStreamComplete":
MarkCompleteCallCount++;
return (ValueTask<TValue>)(object)ValueTask.FromResult(new StreamingResult { Success = true });
case "DeepDrftAudio.recoverFromFailedRefill":
RecoverCallCount++;
return Result<TValue>(true);
default:
// createPlayer / setOnProgressCallback / setOnEndCallback / setVolume /
// ensureAudioContextReady / initializeStreaming / startStreamingPlayback /
// stop / unload / disposePlayer → generic success.
if (typeof(TValue) == typeof(AudioOperationResult))
return Result<TValue>(true);
return Ok<TValue>(default!);
}
}
public ValueTask<TValue> InvokeAsync<TValue>(string identifier, CancellationToken cancellationToken, object?[]? args)
=> InvokeAsync<TValue>(identifier, args);
private static ValueTask<TValue> Ok<TValue>(object? value) => ValueTask.FromResult((TValue)value!);
private static ValueTask<TValue> Result<TValue>(bool success) =>
ValueTask.FromResult((TValue)(object)new AudioOperationResult { Success = success });
}
private static TrackDto Track() => new() { EntryKey = "mix-1", TrackName = "Long Mix" };
private static StreamingAudioPlayerService BuildPlayer(HttpMessageHandler server, FakeJsRuntime js)
{
var interop = new AudioInteropService(js);
var media = new TrackMediaClient(new SingleClientFactory(server));
return new StreamingAudioPlayerService(interop, media, NullLogger<StreamingAudioPlayerService>.Instance);
}
[Test]
public async Task ForwardPlayback_FetchesBoundedSegments_AdvancingCursorToEof()
{
// 10 MB file → 3 segments (4 MB, 4 MB, 2 MB tail). No back-pressure: drains straight through.
var total = 10L * 1024 * 1024;
var server = new SegmentServer(total);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(server.AudioRanges, Has.Count.EqualTo(3),
"a 10 MB file at a 4 MB segment size is fetched as three bounded segments");
// Contiguous, bounded segments advancing the cursor to EOF.
Assert.That(server.AudioRanges[0], Is.EqualTo((0L, (long?)(SegmentSize - 1))));
Assert.That(server.AudioRanges[1], Is.EqualTo((SegmentSize, (long?)(2 * SegmentSize - 1))));
Assert.That(server.AudioRanges[2], Is.EqualTo((2 * SegmentSize, (long?)(3 * SegmentSize - 1))));
Assert.That(js.MarkCompleteCallCount, Is.EqualTo(1), "stream-complete fires once at true EOF, not per segment");
});
}
[Test]
public async Task ForwardPlayback_DoesNotFetchNextSegment_WhileProductionPaused()
{
var total = 10L * 1024 * 1024;
var server = new SegmentServer(total);
// Chunk results report paused=true (so the loop enters the inter-segment gate), and the poll
// stays paused for the first two checks, then releases — so the next segment is delayed, not
// skipped. The gate must hold the SECOND fetch until the poll clears.
var pollChecks = 0;
var js = new FakeJsRuntime(
chunkProductionPaused: () => true,
isProductionPaused: () => ++pollChecks < 3); // paused for polls 1,2; clear on poll 3
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(js.IsProductionPausedCallCount, Is.GreaterThanOrEqualTo(3),
"the inter-segment gate must poll the fill signal while paused, not fetch immediately");
Assert.That(server.AudioRanges, Has.Count.EqualTo(3),
"once the gate clears, segmentation resumes and still reaches EOF — paused delays, never skips");
});
}
[Test]
public async Task SmallFile_FetchedInOneShortSegment_NoSecondFetch()
{
// 2 MB file < one segment: the first bounded request returns a short slice → EOF, no second fetch.
var total = 2L * 1024 * 1024;
var server = new SegmentServer(total);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(server.AudioRanges, Has.Count.EqualTo(1), "a sub-segment file needs exactly one fetch");
Assert.That(server.AudioRanges[0], Is.EqualTo((0L, (long?)(SegmentSize - 1))),
"the request is still the bounded segment shape; the server returns the short available tail");
Assert.That(js.MarkCompleteCallCount, Is.EqualTo(1));
});
}
[Test]
public async Task ForwardLoad_NeverReinitializesDecoder()
{
var total = 20L * 1024 * 1024;
var server = new SegmentServer(total);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(server.AudioRanges, Has.Count.EqualTo(5), "20 MB / 4 MB = five contiguous forward segments");
Assert.That(js.ReinitCallCount, Is.Zero,
"a forward load from byte 0 never reinitializes the decoder — reinit is the seek-only continuation step");
});
}
[Test]
public async Task MidStream_TruncatedSegment_RoutesToRecovery_NotSilentEof()
{
// 10 MB file → 3 segments. The second segment's body is short (1 MB instead of 4 MB) while
// cursor < totalLength — simulates a connection drop mid-segment. The loop must NOT treat this
// as EOF (must not call MarkStreamComplete) and must route to recovery (scheduler halted) so
// the buffered tail cannot drain into a silent false end.
var total = 10L * 1024 * 1024;
var truncatedBodyBytes = 1L * 1024 * 1024; // 1 MB short body for segment 2
var server = new TruncatingAfterFirstSegmentServer(total, truncatedBodyBytes);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(js.MarkCompleteCallCount, Is.Zero,
"a truncated non-final segment must NOT be reported as a clean EOF — MarkStreamComplete must not fire");
Assert.That(js.RecoverCallCount, Is.EqualTo(1),
"a truncated segment while cursor < totalLength is a failure: scheduler must be halted via recovery");
Assert.That(player.IsLoaded, Is.True,
"recovery leaves the track loaded so the listener can retry — not torn down to unloaded");
Assert.That(player.IsStreamingMode, Is.True,
"recovery must restore IsStreamingMode=true so Seek() is not wedged (AC6 / Phase 21.3 retry contract)");
Assert.That(player.IsPaused, Is.True,
"recovery settles into a paused state, not playing");
Assert.That(player.ErrorMessage, Is.Not.Null.And.Not.Empty,
"recovery surfaces an error message to the UI");
});
}
[Test]
public async Task MidStream_SegmentFetchFailure_RoutesToRecovery_NotSilentFalseEnd()
{
// 10 MB file → 3 segments. The second segment fetch fails (HTTP 500), simulating a network
// error after playback is already underway. The loop must halt the JS scheduler via recovery
// rather than letting the buffered first-segment tail drain into a silent false end (AC6).
var total = 10L * 1024 * 1024;
var server = new FailingAfterFirstSegmentServer(total);
var js = new FakeJsRuntime();
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
Assert.Multiple(() =>
{
Assert.That(js.MarkCompleteCallCount, Is.Zero,
"a mid-stream fetch failure must not report clean EOF — MarkStreamComplete must not fire");
Assert.That(js.RecoverCallCount, Is.EqualTo(1),
"a mid-stream fetch failure must halt the scheduler via recovery, not leave it to drain");
Assert.That(player.IsLoaded, Is.True,
"recovery leaves the track loaded so the listener can retry — not torn down to unloaded");
Assert.That(player.IsStreamingMode, Is.True,
"recovery must restore IsStreamingMode=true so Seek() is not wedged (AC6 / Phase 21.3 retry contract)");
Assert.That(player.IsPaused, Is.True,
"recovery settles into a paused state, not playing");
Assert.That(player.ErrorMessage, Is.Not.Null.And.Not.Empty,
"recovery surfaces an error message to the UI");
});
}
[Test]
public async Task SeekBeyondBuffer_ReinitsOnceThenSegmentsForwardFromOffset()
{
// 20 MB file. After load, seek to a byte offset 12 MB in: the seek must reinit the decoder once,
// then continue the SAME bounded-segment loop from 12 MB to EOF (no forked fetch path — C1/C5).
var total = 20L * 1024 * 1024;
var seekOffset = 12L * 1024 * 1024;
var server = new SegmentServer(total);
var js = new FakeJsRuntime(seekByteOffset: seekOffset);
var player = BuildPlayer(server, js);
await player.SelectTrackStreaming(Track());
var afterLoad = server.AudioRanges.Count;
await player.Seek(300); // arbitrary time; the scripted seek returns the 12 MB byte offset
// Segments fetched by the seek/refill: everything after the initial-load segments.
var refillRanges = server.AudioRanges.Skip(afterLoad).ToList();
Assert.Multiple(() =>
{
Assert.That(js.ReinitCallCount, Is.EqualTo(1),
"a seek-beyond-buffer reinitializes the decoder exactly once for the header-less continuation");
Assert.That(refillRanges, Has.Count.EqualTo(2),
"from 12 MB to 20 MB at a 4 MB segment is two bounded segments (4 MB + 4 MB tail)");
Assert.That(refillRanges[0], Is.EqualTo((seekOffset, (long?)(seekOffset + SegmentSize - 1))),
"the first refill segment is bounded and starts at the resolved seek offset");
Assert.That(refillRanges[1], Is.EqualTo((seekOffset + SegmentSize, (long?)(seekOffset + 2 * SegmentSize - 1))),
"segmentation continues forward from the seek offset — the same loop, the same bounded shape");
});
}
}