Files
deepdrft/DeepDrftTests/ProgressStreamContentTests.cs

242 lines
11 KiB
C#

using System.Net.Http;
using DeepDrftManager.Services;
namespace DeepDrftTests;
/// <summary>
/// Unit tests for <see cref="ProgressStreamContent"/> — the single mechanism feeding both the CMS
/// upload progress meter and the idle/heartbeat timeout. Two concerns are anchored here:
/// (1) progress reporting is monotonic and sums to the total content length, and
/// (2) the idle deadline pattern the content drives (reset CancelAfter on each tick) cancels a
/// stalled write yet never fires while writes are progressing.
/// </summary>
[TestFixture]
public class ProgressStreamContentTests
{
// --- Progress reporting ---
[Test]
public async Task ReportsMonotonicallyIncreasingBytes_SummingToContentLength()
{
var payload = new byte[256 * 1024 + 7]; // non-chunk-aligned so the final partial read is exercised
Random.Shared.NextBytes(payload);
var reports = new List<long>();
var content = new ProgressStreamContent(
new MemoryStream(payload), payload.Length, written => reports.Add(written));
using var sink = new MemoryStream();
await content.CopyToAsync(sink);
Assert.That(reports, Is.Not.Empty, "Expected at least one progress tick.");
Assert.That(reports, Is.Ordered.Ascending, "Progress must be monotonically increasing.");
Assert.That(reports[^1], Is.EqualTo(payload.Length), "Final tick must equal total content length.");
Assert.That(sink.ToArray(), Is.EqualTo(payload), "Serialized bytes must match the source payload.");
}
[Test]
public void TryComputeLength_ReturnsProvidedContentLength()
{
const long declared = 987_654;
var content = new ProgressStreamContent(new MemoryStream(), declared, _ => { });
Assert.That(content.Headers.ContentLength, Is.EqualTo(declared));
}
// --- Idle/heartbeat cancellation ---
// The content does not own a timer; it owns the progress signal. The upload service wires that
// signal to CancellationTokenSource.CancelAfter(idle). These tests exercise that exact contract by
// driving the content with a stream whose read cadence we control.
[Test]
public async Task IdleTimeout_DoesNotFire_WhileWritesAreProgressing()
{
var idle = TimeSpan.FromMilliseconds(200);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
// Five chunks, each arriving well within the idle window: progress keeps resetting the deadline.
var source = new PacedStream(chunkCount: 5, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(50));
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
using var sink = new MemoryStream();
await content.CopyToAsync(sink);
Assert.That(idleCts.IsCancellationRequested, Is.False,
"A steadily progressing upload must not trip the idle heartbeat.");
Assert.That(sink.Length, Is.EqualTo(source.TotalLength));
}
[Test]
public void IdleTimeout_Fires_WhenAStallExceedsTheIdleWindow()
{
var idle = TimeSpan.FromMilliseconds(150);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
// One quick chunk, then a stall longer than the idle window before the next read returns.
var source = new PacedStream(
chunkCount: 2, chunkSize: 4096,
delayPerChunk: TimeSpan.FromMilliseconds(10),
stallBeforeChunkIndex: 1, stallDuration: TimeSpan.FromMilliseconds(500));
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
using var sink = new MemoryStream();
Assert.That(
async () => await content.CopyToAsync(sink, idleCts.Token),
Throws.InstanceOf<OperationCanceledException>(),
"A stall exceeding the idle window must cancel the in-flight copy.");
Assert.That(idleCts.IsCancellationRequested, Is.True);
}
// --- Two-phase deadline switching (Finding 1 regression test) ---
// After the last body byte is reported the idle timer must be disarmed and the response-wait
// budget must be armed. This test simulates the exact scenario that triggered Finding 1:
// the body streams quickly, then a long server-side lag (standing in for AudioProcessor +
// vault write + SQL persist) follows. The idle window is short; the response budget is long.
// With the fix the operation must complete; without it idleCts would fire during the lag.
[Test]
public async Task PostBodyLag_DoesNotTriggerIdleTimeout_WhenResponseBudgetIsLarger()
{
var idle = TimeSpan.FromMilliseconds(150);
var responseBudget = TimeSpan.FromMilliseconds(600);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
using var responseCts = new CancellationTokenSource();
// responseCts starts disarmed — same as in CmsTrackService.
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
const long contentLength = 4096;
var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10));
var content = new ProgressStreamContent(source, contentLength, written =>
{
if (written < contentLength)
{
idleCts.CancelAfter(idle);
}
else
{
// Body complete — disarm idle, arm response budget (mirrors CmsTrackService).
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
responseCts.CancelAfter(responseBudget);
}
});
using var sink = new MemoryStream();
await content.CopyToAsync(sink, sendCts.Token);
// Body is done. Simulate a slow server (longer than idle window, shorter than response budget).
var serverLag = TimeSpan.FromMilliseconds(300); // > idle (150 ms), < response budget (600 ms)
await Task.Delay(serverLag, sendCts.Token);
Assert.That(sendCts.IsCancellationRequested, Is.False,
"A post-body server lag within the response budget must not cancel the send token.");
Assert.That(idleCts.IsCancellationRequested, Is.False,
"The idle CTS must be disarmed after body completes.");
Assert.That(responseCts.IsCancellationRequested, Is.False,
"The response CTS must not have fired — server lag was within the response budget.");
}
[Test]
public async Task PostBodyLag_CancelsViaResponseCts_WhenResponseBudgetExceeded()
{
var idle = TimeSpan.FromMilliseconds(200);
var responseBudget = TimeSpan.FromMilliseconds(150);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
using var responseCts = new CancellationTokenSource();
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
const long contentLength = 4096;
var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10));
var content = new ProgressStreamContent(source, contentLength, written =>
{
if (written < contentLength)
{
idleCts.CancelAfter(idle);
}
else
{
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
responseCts.CancelAfter(responseBudget);
}
});
using var sink = new MemoryStream();
await content.CopyToAsync(sink, sendCts.Token);
// Simulate a slow server that exceeds the response budget.
var serverLag = TimeSpan.FromMilliseconds(400); // > response budget (150 ms)
Assert.That(
async () => await Task.Delay(serverLag, sendCts.Token),
Throws.InstanceOf<OperationCanceledException>(),
"A post-body lag exceeding the response budget must cancel via sendCts.");
Assert.That(responseCts.IsCancellationRequested, Is.True,
"responseCts must be the source of the cancellation, not idleCts.");
Assert.That(idleCts.IsCancellationRequested, Is.False,
"idleCts must remain disarmed — the response budget fired, not the idle window.");
}
/// <summary>
/// A read-only stream that yields a fixed number of equal chunks, pausing between reads to emulate
/// network pacing. Optionally inserts a longer stall before a given chunk to emulate a stalled link.
/// </summary>
private sealed class PacedStream : Stream
{
private readonly int _chunkCount;
private readonly int _chunkSize;
private readonly TimeSpan _delayPerChunk;
private readonly int _stallBeforeChunkIndex;
private readonly TimeSpan _stallDuration;
private int _chunksRead;
public PacedStream(int chunkCount, int chunkSize, TimeSpan delayPerChunk,
int stallBeforeChunkIndex = -1, TimeSpan stallDuration = default)
{
_chunkCount = chunkCount;
_chunkSize = chunkSize;
_delayPerChunk = delayPerChunk;
_stallBeforeChunkIndex = stallBeforeChunkIndex;
_stallDuration = stallDuration;
}
public long TotalLength => (long)_chunkCount * _chunkSize;
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (_chunksRead >= _chunkCount) return 0;
if (_chunksRead == _stallBeforeChunkIndex)
{
await Task.Delay(_stallDuration, cancellationToken);
}
else
{
await Task.Delay(_delayPerChunk, cancellationToken);
}
var count = Math.Min(_chunkSize, buffer.Length);
buffer.Span[..count].Clear();
_chunksRead++;
return count;
}
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException("Async-only paced stream.");
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => TotalLength;
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}