242 lines
11 KiB
C#
242 lines
11 KiB
C#
using System.Net.Http;
|
|
using DeepDrftManager.Services;
|
|
|
|
namespace DeepDrftTests;
|
|
|
|
/// <summary>
|
|
/// Unit tests for <see cref="ProgressStreamContent"/> — the single mechanism feeding both the CMS
|
|
/// upload progress meter and the idle/heartbeat timeout. Two concerns are anchored here:
|
|
/// (1) progress reporting is monotonic and sums to the total content length, and
|
|
/// (2) the idle deadline pattern the content drives (reset CancelAfter on each tick) cancels a
|
|
/// stalled write yet never fires while writes are progressing.
|
|
/// </summary>
|
|
[TestFixture]
|
|
public class ProgressStreamContentTests
|
|
{
|
|
// --- Progress reporting ---
|
|
|
|
[Test]
|
|
public async Task ReportsMonotonicallyIncreasingBytes_SummingToContentLength()
|
|
{
|
|
var payload = new byte[256 * 1024 + 7]; // non-chunk-aligned so the final partial read is exercised
|
|
Random.Shared.NextBytes(payload);
|
|
var reports = new List<long>();
|
|
|
|
var content = new ProgressStreamContent(
|
|
new MemoryStream(payload), payload.Length, written => reports.Add(written));
|
|
|
|
using var sink = new MemoryStream();
|
|
await content.CopyToAsync(sink);
|
|
|
|
Assert.That(reports, Is.Not.Empty, "Expected at least one progress tick.");
|
|
Assert.That(reports, Is.Ordered.Ascending, "Progress must be monotonically increasing.");
|
|
Assert.That(reports[^1], Is.EqualTo(payload.Length), "Final tick must equal total content length.");
|
|
Assert.That(sink.ToArray(), Is.EqualTo(payload), "Serialized bytes must match the source payload.");
|
|
}
|
|
|
|
[Test]
|
|
public void TryComputeLength_ReturnsProvidedContentLength()
|
|
{
|
|
const long declared = 987_654;
|
|
var content = new ProgressStreamContent(new MemoryStream(), declared, _ => { });
|
|
|
|
Assert.That(content.Headers.ContentLength, Is.EqualTo(declared));
|
|
}
|
|
|
|
// --- Idle/heartbeat cancellation ---
|
|
// The content does not own a timer; it owns the progress signal. The upload service wires that
|
|
// signal to CancellationTokenSource.CancelAfter(idle). These tests exercise that exact contract by
|
|
// driving the content with a stream whose read cadence we control.
|
|
|
|
[Test]
|
|
public async Task IdleTimeout_DoesNotFire_WhileWritesAreProgressing()
|
|
{
|
|
var idle = TimeSpan.FromMilliseconds(200);
|
|
using var idleCts = new CancellationTokenSource();
|
|
idleCts.CancelAfter(idle);
|
|
|
|
// Five chunks, each arriving well within the idle window: progress keeps resetting the deadline.
|
|
var source = new PacedStream(chunkCount: 5, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(50));
|
|
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
|
|
|
|
using var sink = new MemoryStream();
|
|
await content.CopyToAsync(sink);
|
|
|
|
Assert.That(idleCts.IsCancellationRequested, Is.False,
|
|
"A steadily progressing upload must not trip the idle heartbeat.");
|
|
Assert.That(sink.Length, Is.EqualTo(source.TotalLength));
|
|
}
|
|
|
|
[Test]
|
|
public void IdleTimeout_Fires_WhenAStallExceedsTheIdleWindow()
|
|
{
|
|
var idle = TimeSpan.FromMilliseconds(150);
|
|
using var idleCts = new CancellationTokenSource();
|
|
idleCts.CancelAfter(idle);
|
|
|
|
// One quick chunk, then a stall longer than the idle window before the next read returns.
|
|
var source = new PacedStream(
|
|
chunkCount: 2, chunkSize: 4096,
|
|
delayPerChunk: TimeSpan.FromMilliseconds(10),
|
|
stallBeforeChunkIndex: 1, stallDuration: TimeSpan.FromMilliseconds(500));
|
|
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
|
|
|
|
using var sink = new MemoryStream();
|
|
|
|
Assert.That(
|
|
async () => await content.CopyToAsync(sink, idleCts.Token),
|
|
Throws.InstanceOf<OperationCanceledException>(),
|
|
"A stall exceeding the idle window must cancel the in-flight copy.");
|
|
Assert.That(idleCts.IsCancellationRequested, Is.True);
|
|
}
|
|
|
|
// --- Two-phase deadline switching (Finding 1 regression test) ---
|
|
// After the last body byte is reported the idle timer must be disarmed and the response-wait
|
|
// budget must be armed. This test simulates the exact scenario that triggered Finding 1:
|
|
// the body streams quickly, then a long server-side lag (standing in for AudioProcessor +
|
|
// vault write + SQL persist) follows. The idle window is short; the response budget is long.
|
|
// With the fix the operation must complete; without it idleCts would fire during the lag.
|
|
|
|
[Test]
|
|
public async Task PostBodyLag_DoesNotTriggerIdleTimeout_WhenResponseBudgetIsLarger()
|
|
{
|
|
var idle = TimeSpan.FromMilliseconds(150);
|
|
var responseBudget = TimeSpan.FromMilliseconds(600);
|
|
|
|
using var idleCts = new CancellationTokenSource();
|
|
idleCts.CancelAfter(idle);
|
|
using var responseCts = new CancellationTokenSource();
|
|
// responseCts starts disarmed — same as in CmsTrackService.
|
|
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
|
|
|
|
const long contentLength = 4096;
|
|
var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10));
|
|
var content = new ProgressStreamContent(source, contentLength, written =>
|
|
{
|
|
if (written < contentLength)
|
|
{
|
|
idleCts.CancelAfter(idle);
|
|
}
|
|
else
|
|
{
|
|
// Body complete — disarm idle, arm response budget (mirrors CmsTrackService).
|
|
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
|
|
responseCts.CancelAfter(responseBudget);
|
|
}
|
|
});
|
|
|
|
using var sink = new MemoryStream();
|
|
await content.CopyToAsync(sink, sendCts.Token);
|
|
|
|
// Body is done. Simulate a slow server (longer than idle window, shorter than response budget).
|
|
var serverLag = TimeSpan.FromMilliseconds(300); // > idle (150 ms), < response budget (600 ms)
|
|
await Task.Delay(serverLag, sendCts.Token);
|
|
|
|
Assert.That(sendCts.IsCancellationRequested, Is.False,
|
|
"A post-body server lag within the response budget must not cancel the send token.");
|
|
Assert.That(idleCts.IsCancellationRequested, Is.False,
|
|
"The idle CTS must be disarmed after body completes.");
|
|
Assert.That(responseCts.IsCancellationRequested, Is.False,
|
|
"The response CTS must not have fired — server lag was within the response budget.");
|
|
}
|
|
|
|
[Test]
|
|
public async Task PostBodyLag_CancelsViaResponseCts_WhenResponseBudgetExceeded()
|
|
{
|
|
var idle = TimeSpan.FromMilliseconds(200);
|
|
var responseBudget = TimeSpan.FromMilliseconds(150);
|
|
|
|
using var idleCts = new CancellationTokenSource();
|
|
idleCts.CancelAfter(idle);
|
|
using var responseCts = new CancellationTokenSource();
|
|
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
|
|
|
|
const long contentLength = 4096;
|
|
var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10));
|
|
var content = new ProgressStreamContent(source, contentLength, written =>
|
|
{
|
|
if (written < contentLength)
|
|
{
|
|
idleCts.CancelAfter(idle);
|
|
}
|
|
else
|
|
{
|
|
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
|
|
responseCts.CancelAfter(responseBudget);
|
|
}
|
|
});
|
|
|
|
using var sink = new MemoryStream();
|
|
await content.CopyToAsync(sink, sendCts.Token);
|
|
|
|
// Simulate a slow server that exceeds the response budget.
|
|
var serverLag = TimeSpan.FromMilliseconds(400); // > response budget (150 ms)
|
|
Assert.That(
|
|
async () => await Task.Delay(serverLag, sendCts.Token),
|
|
Throws.InstanceOf<OperationCanceledException>(),
|
|
"A post-body lag exceeding the response budget must cancel via sendCts.");
|
|
Assert.That(responseCts.IsCancellationRequested, Is.True,
|
|
"responseCts must be the source of the cancellation, not idleCts.");
|
|
Assert.That(idleCts.IsCancellationRequested, Is.False,
|
|
"idleCts must remain disarmed — the response budget fired, not the idle window.");
|
|
}
|
|
|
|
/// <summary>
|
|
/// A read-only stream that yields a fixed number of equal chunks, pausing between reads to emulate
|
|
/// network pacing. Optionally inserts a longer stall before a given chunk to emulate a stalled link.
|
|
/// </summary>
|
|
private sealed class PacedStream : Stream
|
|
{
|
|
private readonly int _chunkCount;
|
|
private readonly int _chunkSize;
|
|
private readonly TimeSpan _delayPerChunk;
|
|
private readonly int _stallBeforeChunkIndex;
|
|
private readonly TimeSpan _stallDuration;
|
|
private int _chunksRead;
|
|
|
|
public PacedStream(int chunkCount, int chunkSize, TimeSpan delayPerChunk,
|
|
int stallBeforeChunkIndex = -1, TimeSpan stallDuration = default)
|
|
{
|
|
_chunkCount = chunkCount;
|
|
_chunkSize = chunkSize;
|
|
_delayPerChunk = delayPerChunk;
|
|
_stallBeforeChunkIndex = stallBeforeChunkIndex;
|
|
_stallDuration = stallDuration;
|
|
}
|
|
|
|
public long TotalLength => (long)_chunkCount * _chunkSize;
|
|
|
|
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
|
{
|
|
if (_chunksRead >= _chunkCount) return 0;
|
|
|
|
if (_chunksRead == _stallBeforeChunkIndex)
|
|
{
|
|
await Task.Delay(_stallDuration, cancellationToken);
|
|
}
|
|
else
|
|
{
|
|
await Task.Delay(_delayPerChunk, cancellationToken);
|
|
}
|
|
|
|
var count = Math.Min(_chunkSize, buffer.Length);
|
|
buffer.Span[..count].Clear();
|
|
_chunksRead++;
|
|
return count;
|
|
}
|
|
|
|
public override int Read(byte[] buffer, int offset, int count)
|
|
=> throw new NotSupportedException("Async-only paced stream.");
|
|
|
|
public override bool CanRead => true;
|
|
public override bool CanSeek => false;
|
|
public override bool CanWrite => false;
|
|
public override long Length => TotalLength;
|
|
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
|
|
public override void Flush() { }
|
|
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
|
public override void SetLength(long value) => throw new NotSupportedException();
|
|
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
|
|
}
|
|
}
|