diff --git a/DeepDrftManager/Components/Pages/Tracks/BatchEdit.razor b/DeepDrftManager/Components/Pages/Tracks/BatchEdit.razor index d344903..89d74d7 100644 --- a/DeepDrftManager/Components/Pages/Tracks/BatchEdit.razor +++ b/DeepDrftManager/Components/Pages/Tracks/BatchEdit.razor @@ -476,9 +476,26 @@ { // New track — upload, then link cover art with a follow-up update (same // two-step pattern as BatchUpload; the upload endpoint takes no imagePath). - await using var wavStream = row.WavFile!.OpenReadStream(MaxUploadBytes); + row.UploadedBytes = 0; + row.TotalBytes = row.WavFile!.Size; + await using var wavStream = row.WavFile.OpenReadStream(MaxUploadBytes); + + // Re-render only on whole-percent change so a large upload paints ~100 frames, + // not thousands. Progress marshals back onto the renderer dispatcher. + var lastPercent = -1; + var progress = new Progress(written => + { + row.UploadedBytes = written; + if (row.UploadPercent != lastPercent) + { + lastPercent = row.UploadPercent; + StateHasChanged(); + } + }); + var uploadResult = await CmsTrackService.UploadTrackAsync( wavStream, + row.WavFile.Size, row.WavFile.Name, row.WavFile.ContentType, row.TrackName, @@ -491,7 +508,8 @@ createdByUserId, _releaseType, trackNumber, - _medium); + _medium, + progress); if (!uploadResult.Success || uploadResult.Value is null) { diff --git a/DeepDrftManager/Components/Pages/Tracks/BatchRowModel.cs b/DeepDrftManager/Components/Pages/Tracks/BatchRowModel.cs index 7f057ee..1d4ab85 100644 --- a/DeepDrftManager/Components/Pages/Tracks/BatchRowModel.cs +++ b/DeepDrftManager/Components/Pages/Tracks/BatchRowModel.cs @@ -27,6 +27,17 @@ public class BatchRowModel public BatchRowStatus Status { get; set; } = BatchRowStatus.Queued; public string? ErrorMessage { get; set; } + + /// Bytes pushed to the wire so far for this row's in-flight upload. Reset per attempt. + public long UploadedBytes { get; set; } + + /// Total payload bytes for this row (the WAV file size), the progress denominator. + public long TotalBytes { get; set; } + + /// Upload completion as a 0–100 percent, or 0 when the total is unknown. + public int UploadPercent => TotalBytes > 0 + ? (int)Math.Clamp(UploadedBytes * 100 / TotalBytes, 0, 100) + : 0; } public enum BatchRowStatus { Queued, Uploading, Done, Failed } diff --git a/DeepDrftManager/Components/Pages/Tracks/BatchTrackList.razor b/DeepDrftManager/Components/Pages/Tracks/BatchTrackList.razor index e05fd12..9e8c7f3 100644 --- a/DeepDrftManager/Components/Pages/Tracks/BatchTrackList.razor +++ b/DeepDrftManager/Components/Pages/Tracks/BatchTrackList.razor @@ -41,6 +41,13 @@ OnClick="@(() => OnRemove.InvokeAsync(index))" aria-label="Remove track" /> + @if (row.Status == BatchRowStatus.Uploading) + { + + } } diff --git a/DeepDrftManager/Components/Pages/Tracks/BatchUpload.razor b/DeepDrftManager/Components/Pages/Tracks/BatchUpload.razor index 0050c19..a256101 100644 --- a/DeepDrftManager/Components/Pages/Tracks/BatchUpload.razor +++ b/DeepDrftManager/Components/Pages/Tracks/BatchUpload.razor @@ -64,6 +64,12 @@ { @* Track name is derived from the Release Name for Session/Mix — no separate input. *@ Selected: @(_tracks[0].WavFile?.Name ?? "—") + @if (_tracks[0].Status == BatchRowStatus.Uploading) + { + + } } @@ -330,14 +336,33 @@ row.Status = BatchRowStatus.Uploading; StateHasChanged(); + row.UploadedBytes = 0; + row.TotalBytes = row.WavFile!.Size; + try { // OpenReadStream streams chunks from the browser via the SignalR circuit; the - // service wraps it in StreamContent so the whole file is never materialised in - // memory before DeepDrftAPI receives it. - await using var wavStream = row.WavFile!.OpenReadStream(MaxUploadBytes); + // service wraps it in ProgressStreamContent so the whole file is never materialised + // in memory before DeepDrftAPI receives it, and reports bytes-on-the-wire back here. + await using var wavStream = row.WavFile.OpenReadStream(MaxUploadBytes); + + // Progress ticks fire ~once per 80 KB; re-render only when the whole-percent changes + // so a half-gig upload paints ~100 frames, not thousands. Progress marshals the + // callback onto the component's renderer dispatcher, so StateHasChanged is safe here. + var lastPercent = -1; + var progress = new Progress(written => + { + row.UploadedBytes = written; + if (row.UploadPercent != lastPercent) + { + lastPercent = row.UploadPercent; + StateHasChanged(); + } + }); + var result = await CmsTrackService.UploadTrackAsync( wavStream, + row.WavFile.Size, row.WavFile.Name, row.WavFile.ContentType, row.TrackName, @@ -350,7 +375,8 @@ createdByUserId, _releaseType, trackNumber, - _medium); + _medium, + progress); if (!result.Success || result.Value is null) { diff --git a/DeepDrftManager/Program.cs b/DeepDrftManager/Program.cs index 6c3941d..923f8dd 100644 --- a/DeepDrftManager/Program.cs +++ b/DeepDrftManager/Program.cs @@ -44,8 +44,10 @@ builder.Services.AddHttpClient("DeepDrft.Content", client => client.BaseAddress = new Uri(contentApiUrl); }); -// Named HttpClient for ApiKey-protected Content API calls (CmsTrackService's vault delete). -// API key baked into the default request headers so callers need not add it manually. +// Named HttpClient for ApiKey-protected Content API calls (CmsTrackService's non-upload operations: +// delete, paged list, metadata read/write, waveform jobs, releases, genres). +// Timeout left at the default 100s — these are short request/response pairs and an infinite timeout +// would hang an InteractiveServer circuit forever on a dead connection. var contentApiKey = builder.Configuration["Api:ContentApiKey"] ?? throw new InvalidOperationException("Api:ContentApiKey is required"); builder.Services.AddHttpClient("DeepDrft.Content.Cms", client => @@ -54,6 +56,17 @@ builder.Services.AddHttpClient("DeepDrft.Content.Cms", client => client.DefaultRequestHeaders.Add("ApiKey", contentApiKey); }); +// Dedicated upload client — inherits the API key but removes the whole-request timeout. +// Large WAV uploads (several hundred MB) outrun the 100s default. The upload path enforces an +// idle/heartbeat deadline instead (body-streaming phase via ProgressStreamContent) plus a separate +// response-wait budget (CmsTrackService), so the client itself must not impose a total cap. +builder.Services.AddHttpClient("DeepDrft.Content.Cms.Upload", client => +{ + client.BaseAddress = new Uri(contentApiUrl); + client.DefaultRequestHeaders.Add("ApiKey", contentApiKey); + client.Timeout = Timeout.InfiniteTimeSpan; +}); + // Reverse-proxy support (nginx in production). builder.Services.Configure(options => { diff --git a/DeepDrftManager/Services/CmsTrackService.cs b/DeepDrftManager/Services/CmsTrackService.cs index fba5f7a..a84c971 100644 --- a/DeepDrftManager/Services/CmsTrackService.cs +++ b/DeepDrftManager/Services/CmsTrackService.cs @@ -18,21 +18,43 @@ namespace DeepDrftManager.Services; public class CmsTrackService : ICmsTrackService { private const string ContentCmsClientName = "DeepDrft.Content.Cms"; + private const string UploadClientName = "DeepDrft.Content.Cms.Upload"; private const string UploadPath = "api/track/upload"; + // Idle/heartbeat window: abort an upload only after this long with zero bytes written to the wire. + // The window resets on every progress tick, so a slow-but-moving half-gig upload never trips it; + // a genuinely stalled socket does. Governs the BODY-STREAMING phase only. + // Operator-tunable via Upload:IdleTimeoutSeconds. + private const int DefaultIdleTimeoutSeconds = 90; + + // Response-wait budget: once the request body is fully on the wire the server runs AudioProcessor + // decode → vault write → SQL persist. For a several-hundred-MB WAV this can take many minutes. + // The idle heartbeat goes silent after the last byte, so a separate, larger deadline governs the + // response-wait phase so a fully-uploaded file is never killed mid-persist. + // Operator-tunable via Upload:ResponseTimeoutSeconds. + private const int DefaultResponseTimeoutSeconds = 600; // 10 minutes + private readonly IHttpClientFactory _httpClientFactory; private readonly ILogger _logger; + private readonly TimeSpan _uploadIdleTimeout; + private readonly TimeSpan _uploadResponseTimeout; public CmsTrackService( IHttpClientFactory httpClientFactory, + IConfiguration configuration, ILogger logger) { _httpClientFactory = httpClientFactory; _logger = logger; + var idleSeconds = configuration.GetValue("Upload:IdleTimeoutSeconds") ?? DefaultIdleTimeoutSeconds; + _uploadIdleTimeout = TimeSpan.FromSeconds(idleSeconds > 0 ? idleSeconds : DefaultIdleTimeoutSeconds); + var responseSeconds = configuration.GetValue("Upload:ResponseTimeoutSeconds") ?? DefaultResponseTimeoutSeconds; + _uploadResponseTimeout = TimeSpan.FromSeconds(responseSeconds > 0 ? responseSeconds : DefaultResponseTimeoutSeconds); } public async Task> UploadTrackAsync( Stream wavStream, + long contentLength, string fileName, string contentType, string trackName, @@ -46,12 +68,55 @@ public class CmsTrackService : ICmsTrackService ReleaseType releaseType, int trackNumber, ReleaseMedium medium = ReleaseMedium.Cut, + IProgress? progress = null, CancellationToken ct = default) { + // Two-phase cancellation for the upload send: + // + // BODY-STREAMING phase (while bytes are on the wire): + // idleCts fires if no progress tick arrives within the idle window. Each + // ProgressStreamContent chunk resets CancelAfter(idle), so a slow-but-moving + // upload never trips it; a genuinely stalled socket does. + // + // RESPONSE-WAIT phase (after the last byte, while the server persists): + // The idle heartbeat goes silent once the body is fully sent. responseCts is + // armed at that moment with a larger budget so a fully-uploaded file is never + // killed mid-persist. idleCts is simultaneously disarmed (CancelAfter(Infinite)) + // so it cannot misfire during the response-wait. + // + // sendCts links both so either deadline — plus the caller's ct — cancels the send. + using var idleCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + idleCts.CancelAfter(_uploadIdleTimeout); + + // responseCts starts disarmed; the body-complete callback below arms it. + using var responseCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + + // Umbrella token passed to SendAsync — either phase token (or the caller) can cancel. + using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token); + // Rebuild the multipart container so the boundary is owned by HttpClient and the // caller-supplied stream (already buffered by the SignalR upload) is the source. using var multipart = new MultipartFormDataContent(); - var wavContent = new StreamContent(wavStream); + var wavContent = new ProgressStreamContent( + wavStream, + contentLength, + written => + { + // One mechanism, three consumers: advance the UI meter, reset the idle heartbeat, + // and on body-complete transition to the response-wait budget. + progress?.Report(written); + if (written < contentLength) + { + // Body still in flight — keep the idle heartbeat alive. + idleCts.CancelAfter(_uploadIdleTimeout); + } + else + { + // Last byte on the wire. Disarm the idle timer and start the response budget. + idleCts.CancelAfter(Timeout.InfiniteTimeSpan); + responseCts.CancelAfter(_uploadResponseTimeout); + } + }); wavContent.Headers.ContentType = new MediaTypeHeaderValue( string.IsNullOrWhiteSpace(contentType) ? "audio/wav" : contentType); multipart.Add(wavContent, "audioFile", fileName); @@ -70,13 +135,31 @@ public class CmsTrackService : ICmsTrackService // for an unrecognised value). Authoritative only when this upload creates the release. multipart.Add(new StringContent(medium.ToString()), "medium"); - var client = _httpClientFactory.CreateClient(ContentCmsClientName); + // Use the dedicated upload client (InfiniteTimeSpan) so the two-phase CTS logic above is the + // sole timeout authority. Non-upload operations use the bounded "DeepDrft.Content.Cms" client. + var client = _httpClientFactory.CreateClient(UploadClientName); using var request = new HttpRequestMessage(HttpMethod.Post, UploadPath) { Content = multipart }; HttpResponseMessage response; try { - response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ct); + response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, sendCts.Token); + } + catch (OperationCanceledException) when (!ct.IsCancellationRequested) + { + // Either idle window (body-streaming stall) or response-wait budget (server persist too slow). + if (idleCts.IsCancellationRequested) + { + _logger.LogWarning("Upload of {TrackName} stalled — no progress for {IdleSeconds}s; aborting.", + trackName, _uploadIdleTimeout.TotalSeconds); + return ResultContainer.CreateFailResult( + $"Upload stalled — no data transferred for {_uploadIdleTimeout.TotalSeconds:0}s. Please retry."); + } + // responseCts fired: body reached the server but persist timed out. + _logger.LogWarning("Upload of {TrackName} timed out waiting for server response after {ResponseSeconds}s.", + trackName, _uploadResponseTimeout.TotalSeconds); + return ResultContainer.CreateFailResult( + $"Upload timed out waiting for the server to respond after {_uploadResponseTimeout.TotalSeconds:0}s. Please retry."); } catch (Exception ex) { diff --git a/DeepDrftManager/Services/ICmsTrackService.cs b/DeepDrftManager/Services/ICmsTrackService.cs index 016d345..66246d0 100644 --- a/DeepDrftManager/Services/ICmsTrackService.cs +++ b/DeepDrftManager/Services/ICmsTrackService.cs @@ -21,9 +21,14 @@ public interface ICmsTrackService /// sets the parent release's when this upload /// creates the release. The medium is authoritative only on creation — adding a track to an existing /// release never changes its medium (that is the edit path, ). + /// is the total payload size (the browser file's Size); it + /// sets Content-Length and is the denominator for , which reports cumulative + /// bytes pushed to the wire. Each progress tick also resets the idle/heartbeat upload timeout, so a + /// stalled connection aborts without a fixed total-duration cap. /// Task> UploadTrackAsync( Stream wavStream, + long contentLength, string fileName, string contentType, string trackName, @@ -37,6 +42,7 @@ public interface ICmsTrackService ReleaseType releaseType, int trackNumber, ReleaseMedium medium = ReleaseMedium.Cut, + IProgress? progress = null, CancellationToken ct = default); /// diff --git a/DeepDrftManager/Services/ProgressStreamContent.cs b/DeepDrftManager/Services/ProgressStreamContent.cs new file mode 100644 index 0000000..675b2d3 --- /dev/null +++ b/DeepDrftManager/Services/ProgressStreamContent.cs @@ -0,0 +1,64 @@ +using System.Net; + +namespace DeepDrftManager.Services; + +/// +/// An that streams a source stream to the wire while reporting cumulative +/// bytes written after each chunk. This is the single source of truth for both the upload progress +/// meter and the idle/heartbeat timeout: every reported tick advances the UI and resets the +/// idle deadline, so one mechanism feeds both concerns. +/// +/// +/// Wrap the audio payload (not the whole multipart container) so +/// returns the file length and the reported byte counts map directly onto "bytes of this file". +/// +public sealed class ProgressStreamContent : HttpContent +{ + // 80 KB: large enough to keep the socket fed on a healthy link, small enough that a stalled + // connection trips the idle window without a multi-MB write swallowing the whole heartbeat budget. + private const int CopyBufferSize = 81_920; + + private readonly Stream _source; + private readonly long _length; + private readonly Action _onBytesWritten; + + /// The payload stream. Read once, sequentially — not seekable-rewound. + /// Total bytes the source will yield; sets Content-Length and the meter denominator. + /// Invoked after each chunk with the cumulative bytes written so far. + public ProgressStreamContent(Stream source, long length, Action onBytesWritten) + { + _source = source; + _length = length; + _onBytesWritten = onBytesWritten; + } + + // Token-aware overload (.NET 5+): HttpClient calls this on the send path and passes the request's + // CancellationToken, so the idle-heartbeat CTS aborts an in-flight read/write promptly — not just + // between chunks. The parameterless base override delegates here with CancellationToken.None. + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken) + => CopyAsync(stream, cancellationToken); + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) + => CopyAsync(stream, CancellationToken.None); + + private async Task CopyAsync(Stream stream, CancellationToken cancellationToken) + { + var buffer = new byte[CopyBufferSize]; + long written = 0; + int read; + while ((read = await _source.ReadAsync(buffer, cancellationToken)) > 0) + { + await stream.WriteAsync(buffer.AsMemory(0, read), cancellationToken); + written += read; + // Report after the bytes are on the wire — a tick means real forward progress, which is + // exactly the signal the idle heartbeat must reset on. + _onBytesWritten(written); + } + } + + protected override bool TryComputeLength(out long length) + { + length = _length; + return true; + } +} diff --git a/DeepDrftTests/DeepDrftTests.csproj b/DeepDrftTests/DeepDrftTests.csproj index cab1aed..b8478d1 100644 --- a/DeepDrftTests/DeepDrftTests.csproj +++ b/DeepDrftTests/DeepDrftTests.csproj @@ -30,6 +30,9 @@ + + diff --git a/DeepDrftTests/ProgressStreamContentTests.cs b/DeepDrftTests/ProgressStreamContentTests.cs new file mode 100644 index 0000000..c87fe94 --- /dev/null +++ b/DeepDrftTests/ProgressStreamContentTests.cs @@ -0,0 +1,241 @@ +using System.Net.Http; +using DeepDrftManager.Services; + +namespace DeepDrftTests; + +/// +/// Unit tests for — the single mechanism feeding both the CMS +/// upload progress meter and the idle/heartbeat timeout. Two concerns are anchored here: +/// (1) progress reporting is monotonic and sums to the total content length, and +/// (2) the idle deadline pattern the content drives (reset CancelAfter on each tick) cancels a +/// stalled write yet never fires while writes are progressing. +/// +[TestFixture] +public class ProgressStreamContentTests +{ + // --- Progress reporting --- + + [Test] + public async Task ReportsMonotonicallyIncreasingBytes_SummingToContentLength() + { + var payload = new byte[256 * 1024 + 7]; // non-chunk-aligned so the final partial read is exercised + Random.Shared.NextBytes(payload); + var reports = new List(); + + var content = new ProgressStreamContent( + new MemoryStream(payload), payload.Length, written => reports.Add(written)); + + using var sink = new MemoryStream(); + await content.CopyToAsync(sink); + + Assert.That(reports, Is.Not.Empty, "Expected at least one progress tick."); + Assert.That(reports, Is.Ordered.Ascending, "Progress must be monotonically increasing."); + Assert.That(reports[^1], Is.EqualTo(payload.Length), "Final tick must equal total content length."); + Assert.That(sink.ToArray(), Is.EqualTo(payload), "Serialized bytes must match the source payload."); + } + + [Test] + public void TryComputeLength_ReturnsProvidedContentLength() + { + const long declared = 987_654; + var content = new ProgressStreamContent(new MemoryStream(), declared, _ => { }); + + Assert.That(content.Headers.ContentLength, Is.EqualTo(declared)); + } + + // --- Idle/heartbeat cancellation --- + // The content does not own a timer; it owns the progress signal. The upload service wires that + // signal to CancellationTokenSource.CancelAfter(idle). These tests exercise that exact contract by + // driving the content with a stream whose read cadence we control. + + [Test] + public async Task IdleTimeout_DoesNotFire_WhileWritesAreProgressing() + { + var idle = TimeSpan.FromMilliseconds(200); + using var idleCts = new CancellationTokenSource(); + idleCts.CancelAfter(idle); + + // Five chunks, each arriving well within the idle window: progress keeps resetting the deadline. + var source = new PacedStream(chunkCount: 5, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(50)); + var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle)); + + using var sink = new MemoryStream(); + await content.CopyToAsync(sink); + + Assert.That(idleCts.IsCancellationRequested, Is.False, + "A steadily progressing upload must not trip the idle heartbeat."); + Assert.That(sink.Length, Is.EqualTo(source.TotalLength)); + } + + [Test] + public void IdleTimeout_Fires_WhenAStallExceedsTheIdleWindow() + { + var idle = TimeSpan.FromMilliseconds(150); + using var idleCts = new CancellationTokenSource(); + idleCts.CancelAfter(idle); + + // One quick chunk, then a stall longer than the idle window before the next read returns. + var source = new PacedStream( + chunkCount: 2, chunkSize: 4096, + delayPerChunk: TimeSpan.FromMilliseconds(10), + stallBeforeChunkIndex: 1, stallDuration: TimeSpan.FromMilliseconds(500)); + var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle)); + + using var sink = new MemoryStream(); + + Assert.That( + async () => await content.CopyToAsync(sink, idleCts.Token), + Throws.InstanceOf(), + "A stall exceeding the idle window must cancel the in-flight copy."); + Assert.That(idleCts.IsCancellationRequested, Is.True); + } + + // --- Two-phase deadline switching (Finding 1 regression test) --- + // After the last body byte is reported the idle timer must be disarmed and the response-wait + // budget must be armed. This test simulates the exact scenario that triggered Finding 1: + // the body streams quickly, then a long server-side lag (standing in for AudioProcessor + + // vault write + SQL persist) follows. The idle window is short; the response budget is long. + // With the fix the operation must complete; without it idleCts would fire during the lag. + + [Test] + public async Task PostBodyLag_DoesNotTriggerIdleTimeout_WhenResponseBudgetIsLarger() + { + var idle = TimeSpan.FromMilliseconds(150); + var responseBudget = TimeSpan.FromMilliseconds(600); + + using var idleCts = new CancellationTokenSource(); + idleCts.CancelAfter(idle); + using var responseCts = new CancellationTokenSource(); + // responseCts starts disarmed — same as in CmsTrackService. + using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token); + + const long contentLength = 4096; + var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10)); + var content = new ProgressStreamContent(source, contentLength, written => + { + if (written < contentLength) + { + idleCts.CancelAfter(idle); + } + else + { + // Body complete — disarm idle, arm response budget (mirrors CmsTrackService). + idleCts.CancelAfter(Timeout.InfiniteTimeSpan); + responseCts.CancelAfter(responseBudget); + } + }); + + using var sink = new MemoryStream(); + await content.CopyToAsync(sink, sendCts.Token); + + // Body is done. Simulate a slow server (longer than idle window, shorter than response budget). + var serverLag = TimeSpan.FromMilliseconds(300); // > idle (150 ms), < response budget (600 ms) + await Task.Delay(serverLag, sendCts.Token); + + Assert.That(sendCts.IsCancellationRequested, Is.False, + "A post-body server lag within the response budget must not cancel the send token."); + Assert.That(idleCts.IsCancellationRequested, Is.False, + "The idle CTS must be disarmed after body completes."); + Assert.That(responseCts.IsCancellationRequested, Is.False, + "The response CTS must not have fired — server lag was within the response budget."); + } + + [Test] + public async Task PostBodyLag_CancelsViaResponseCts_WhenResponseBudgetExceeded() + { + var idle = TimeSpan.FromMilliseconds(200); + var responseBudget = TimeSpan.FromMilliseconds(150); + + using var idleCts = new CancellationTokenSource(); + idleCts.CancelAfter(idle); + using var responseCts = new CancellationTokenSource(); + using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token); + + const long contentLength = 4096; + var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10)); + var content = new ProgressStreamContent(source, contentLength, written => + { + if (written < contentLength) + { + idleCts.CancelAfter(idle); + } + else + { + idleCts.CancelAfter(Timeout.InfiniteTimeSpan); + responseCts.CancelAfter(responseBudget); + } + }); + + using var sink = new MemoryStream(); + await content.CopyToAsync(sink, sendCts.Token); + + // Simulate a slow server that exceeds the response budget. + var serverLag = TimeSpan.FromMilliseconds(400); // > response budget (150 ms) + Assert.That( + async () => await Task.Delay(serverLag, sendCts.Token), + Throws.InstanceOf(), + "A post-body lag exceeding the response budget must cancel via sendCts."); + Assert.That(responseCts.IsCancellationRequested, Is.True, + "responseCts must be the source of the cancellation, not idleCts."); + Assert.That(idleCts.IsCancellationRequested, Is.False, + "idleCts must remain disarmed — the response budget fired, not the idle window."); + } + + /// + /// A read-only stream that yields a fixed number of equal chunks, pausing between reads to emulate + /// network pacing. Optionally inserts a longer stall before a given chunk to emulate a stalled link. + /// + private sealed class PacedStream : Stream + { + private readonly int _chunkCount; + private readonly int _chunkSize; + private readonly TimeSpan _delayPerChunk; + private readonly int _stallBeforeChunkIndex; + private readonly TimeSpan _stallDuration; + private int _chunksRead; + + public PacedStream(int chunkCount, int chunkSize, TimeSpan delayPerChunk, + int stallBeforeChunkIndex = -1, TimeSpan stallDuration = default) + { + _chunkCount = chunkCount; + _chunkSize = chunkSize; + _delayPerChunk = delayPerChunk; + _stallBeforeChunkIndex = stallBeforeChunkIndex; + _stallDuration = stallDuration; + } + + public long TotalLength => (long)_chunkCount * _chunkSize; + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (_chunksRead >= _chunkCount) return 0; + + if (_chunksRead == _stallBeforeChunkIndex) + { + await Task.Delay(_stallDuration, cancellationToken); + } + else + { + await Task.Delay(_delayPerChunk, cancellationToken); + } + + var count = Math.Min(_chunkSize, buffer.Length); + buffer.Span[..count].Clear(); + _chunksRead++; + return count; + } + + public override int Read(byte[] buffer, int offset, int count) + => throw new NotSupportedException("Async-only paced stream."); + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => TotalLength; + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + public override void Flush() { } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } +}