using System.Net.Http; using DeepDrftManager.Services; namespace DeepDrftTests; /// /// Unit tests for — the single mechanism feeding both the CMS /// upload progress meter and the idle/heartbeat timeout. Two concerns are anchored here: /// (1) progress reporting is monotonic and sums to the total content length, and /// (2) the idle deadline pattern the content drives (reset CancelAfter on each tick) cancels a /// stalled write yet never fires while writes are progressing. /// [TestFixture] public class ProgressStreamContentTests { // --- Progress reporting --- [Test] public async Task ReportsMonotonicallyIncreasingBytes_SummingToContentLength() { var payload = new byte[256 * 1024 + 7]; // non-chunk-aligned so the final partial read is exercised Random.Shared.NextBytes(payload); var reports = new List(); var content = new ProgressStreamContent( new MemoryStream(payload), payload.Length, written => reports.Add(written)); using var sink = new MemoryStream(); await content.CopyToAsync(sink); Assert.That(reports, Is.Not.Empty, "Expected at least one progress tick."); Assert.That(reports, Is.Ordered.Ascending, "Progress must be monotonically increasing."); Assert.That(reports[^1], Is.EqualTo(payload.Length), "Final tick must equal total content length."); Assert.That(sink.ToArray(), Is.EqualTo(payload), "Serialized bytes must match the source payload."); } [Test] public void TryComputeLength_ReturnsProvidedContentLength() { const long declared = 987_654; var content = new ProgressStreamContent(new MemoryStream(), declared, _ => { }); Assert.That(content.Headers.ContentLength, Is.EqualTo(declared)); } // --- Idle/heartbeat cancellation --- // The content does not own a timer; it owns the progress signal. The upload service wires that // signal to CancellationTokenSource.CancelAfter(idle). These tests exercise that exact contract by // driving the content with a stream whose read cadence we control. [Test] public async Task IdleTimeout_DoesNotFire_WhileWritesAreProgressing() { var idle = TimeSpan.FromMilliseconds(200); using var idleCts = new CancellationTokenSource(); idleCts.CancelAfter(idle); // Five chunks, each arriving well within the idle window: progress keeps resetting the deadline. var source = new PacedStream(chunkCount: 5, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(50)); var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle)); using var sink = new MemoryStream(); await content.CopyToAsync(sink); Assert.That(idleCts.IsCancellationRequested, Is.False, "A steadily progressing upload must not trip the idle heartbeat."); Assert.That(sink.Length, Is.EqualTo(source.TotalLength)); } [Test] public void IdleTimeout_Fires_WhenAStallExceedsTheIdleWindow() { var idle = TimeSpan.FromMilliseconds(150); using var idleCts = new CancellationTokenSource(); idleCts.CancelAfter(idle); // One quick chunk, then a stall longer than the idle window before the next read returns. var source = new PacedStream( chunkCount: 2, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(10), stallBeforeChunkIndex: 1, stallDuration: TimeSpan.FromMilliseconds(500)); var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle)); using var sink = new MemoryStream(); Assert.That( async () => await content.CopyToAsync(sink, idleCts.Token), Throws.InstanceOf(), "A stall exceeding the idle window must cancel the in-flight copy."); Assert.That(idleCts.IsCancellationRequested, Is.True); } // --- Two-phase deadline switching (Finding 1 regression test) --- // After the last body byte is reported the idle timer must be disarmed and the response-wait // budget must be armed. This test simulates the exact scenario that triggered Finding 1: // the body streams quickly, then a long server-side lag (standing in for AudioProcessor + // vault write + SQL persist) follows. The idle window is short; the response budget is long. // With the fix the operation must complete; without it idleCts would fire during the lag. [Test] public async Task PostBodyLag_DoesNotTriggerIdleTimeout_WhenResponseBudgetIsLarger() { var idle = TimeSpan.FromMilliseconds(150); var responseBudget = TimeSpan.FromMilliseconds(600); using var idleCts = new CancellationTokenSource(); idleCts.CancelAfter(idle); using var responseCts = new CancellationTokenSource(); // responseCts starts disarmed — same as in CmsTrackService. using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token); const long contentLength = 4096; var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10)); var content = new ProgressStreamContent(source, contentLength, written => { if (written < contentLength) { idleCts.CancelAfter(idle); } else { // Body complete — disarm idle, arm response budget (mirrors CmsTrackService). idleCts.CancelAfter(Timeout.InfiniteTimeSpan); responseCts.CancelAfter(responseBudget); } }); using var sink = new MemoryStream(); await content.CopyToAsync(sink, sendCts.Token); // Body is done. Simulate a slow server (longer than idle window, shorter than response budget). var serverLag = TimeSpan.FromMilliseconds(300); // > idle (150 ms), < response budget (600 ms) await Task.Delay(serverLag, sendCts.Token); Assert.That(sendCts.IsCancellationRequested, Is.False, "A post-body server lag within the response budget must not cancel the send token."); Assert.That(idleCts.IsCancellationRequested, Is.False, "The idle CTS must be disarmed after body completes."); Assert.That(responseCts.IsCancellationRequested, Is.False, "The response CTS must not have fired — server lag was within the response budget."); } [Test] public async Task PostBodyLag_CancelsViaResponseCts_WhenResponseBudgetExceeded() { var idle = TimeSpan.FromMilliseconds(200); var responseBudget = TimeSpan.FromMilliseconds(150); using var idleCts = new CancellationTokenSource(); idleCts.CancelAfter(idle); using var responseCts = new CancellationTokenSource(); using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token); const long contentLength = 4096; var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10)); var content = new ProgressStreamContent(source, contentLength, written => { if (written < contentLength) { idleCts.CancelAfter(idle); } else { idleCts.CancelAfter(Timeout.InfiniteTimeSpan); responseCts.CancelAfter(responseBudget); } }); using var sink = new MemoryStream(); await content.CopyToAsync(sink, sendCts.Token); // Simulate a slow server that exceeds the response budget. var serverLag = TimeSpan.FromMilliseconds(400); // > response budget (150 ms) Assert.That( async () => await Task.Delay(serverLag, sendCts.Token), Throws.InstanceOf(), "A post-body lag exceeding the response budget must cancel via sendCts."); Assert.That(responseCts.IsCancellationRequested, Is.True, "responseCts must be the source of the cancellation, not idleCts."); Assert.That(idleCts.IsCancellationRequested, Is.False, "idleCts must remain disarmed — the response budget fired, not the idle window."); } /// /// A read-only stream that yields a fixed number of equal chunks, pausing between reads to emulate /// network pacing. Optionally inserts a longer stall before a given chunk to emulate a stalled link. /// private sealed class PacedStream : Stream { private readonly int _chunkCount; private readonly int _chunkSize; private readonly TimeSpan _delayPerChunk; private readonly int _stallBeforeChunkIndex; private readonly TimeSpan _stallDuration; private int _chunksRead; public PacedStream(int chunkCount, int chunkSize, TimeSpan delayPerChunk, int stallBeforeChunkIndex = -1, TimeSpan stallDuration = default) { _chunkCount = chunkCount; _chunkSize = chunkSize; _delayPerChunk = delayPerChunk; _stallBeforeChunkIndex = stallBeforeChunkIndex; _stallDuration = stallDuration; } public long TotalLength => (long)_chunkCount * _chunkSize; public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) { if (_chunksRead >= _chunkCount) return 0; if (_chunksRead == _stallBeforeChunkIndex) { await Task.Delay(_stallDuration, cancellationToken); } else { await Task.Delay(_delayPerChunk, cancellationToken); } var count = Math.Min(_chunkSize, buffer.Length); buffer.Span[..count].Clear(); _chunksRead++; return count; } public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("Async-only paced stream."); public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => false; public override long Length => TotalLength; public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } public override void Flush() { } public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); public override void SetLength(long value) => throw new NotSupportedException(); public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); } }