Merge cms-upload-heartbeat into dev (large CMS upload: idle/heartbeat timeout, two-phase response budget, per-file progress meter)

This commit is contained in:
daniel-c-harvey
2026-06-17 11:27:55 -04:00
10 changed files with 483 additions and 11 deletions
@@ -476,9 +476,26 @@
{
// New track — upload, then link cover art with a follow-up update (same
// two-step pattern as BatchUpload; the upload endpoint takes no imagePath).
await using var wavStream = row.WavFile!.OpenReadStream(MaxUploadBytes);
row.UploadedBytes = 0;
row.TotalBytes = row.WavFile!.Size;
await using var wavStream = row.WavFile.OpenReadStream(MaxUploadBytes);
// Re-render only on whole-percent change so a large upload paints ~100 frames,
// not thousands. Progress<T> marshals back onto the renderer dispatcher.
var lastPercent = -1;
var progress = new Progress<long>(written =>
{
row.UploadedBytes = written;
if (row.UploadPercent != lastPercent)
{
lastPercent = row.UploadPercent;
StateHasChanged();
}
});
var uploadResult = await CmsTrackService.UploadTrackAsync(
wavStream,
row.WavFile.Size,
row.WavFile.Name,
row.WavFile.ContentType,
row.TrackName,
@@ -491,7 +508,8 @@
createdByUserId,
_releaseType,
trackNumber,
_medium);
_medium,
progress);
if (!uploadResult.Success || uploadResult.Value is null)
{
@@ -27,6 +27,17 @@ public class BatchRowModel
public BatchRowStatus Status { get; set; } = BatchRowStatus.Queued;
public string? ErrorMessage { get; set; }
/// <summary>Bytes pushed to the wire so far for this row's in-flight upload. Reset per attempt.</summary>
public long UploadedBytes { get; set; }
/// <summary>Total payload bytes for this row (the WAV file size), the progress denominator.</summary>
public long TotalBytes { get; set; }
/// <summary>Upload completion as a 0100 percent, or 0 when the total is unknown.</summary>
public int UploadPercent => TotalBytes > 0
? (int)Math.Clamp(UploadedBytes * 100 / TotalBytes, 0, 100)
: 0;
}
public enum BatchRowStatus { Queued, Uploading, Done, Failed }
@@ -41,6 +41,13 @@
OnClick="@(() => OnRemove.InvokeAsync(index))"
aria-label="Remove track" />
</MudStack>
@if (row.Status == BatchRowStatus.Uploading)
{
<MudProgressLinear Color="Color.Info"
Value="@row.UploadPercent"
Class="mx-2 mb-2"
aria-label="@($"Uploading {row.TrackName}")" />
}
</div>
}
</MudList>
@@ -64,6 +64,12 @@
{
@* Track name is derived from the Release Name for Session/Mix — no separate input. *@
<MudText Typo="Typo.caption">Selected: @(_tracks[0].WavFile?.Name ?? "—")</MudText>
@if (_tracks[0].Status == BatchRowStatus.Uploading)
{
<MudProgressLinear Color="Color.Info"
Value="@_tracks[0].UploadPercent"
aria-label="Uploading track" />
}
}
</MudStack>
</MudPaper>
@@ -330,14 +336,33 @@
row.Status = BatchRowStatus.Uploading;
StateHasChanged();
row.UploadedBytes = 0;
row.TotalBytes = row.WavFile!.Size;
try
{
// OpenReadStream streams chunks from the browser via the SignalR circuit; the
// service wraps it in StreamContent so the whole file is never materialised in
// memory before DeepDrftAPI receives it.
await using var wavStream = row.WavFile!.OpenReadStream(MaxUploadBytes);
// service wraps it in ProgressStreamContent so the whole file is never materialised
// in memory before DeepDrftAPI receives it, and reports bytes-on-the-wire back here.
await using var wavStream = row.WavFile.OpenReadStream(MaxUploadBytes);
// Progress ticks fire ~once per 80 KB; re-render only when the whole-percent changes
// so a half-gig upload paints ~100 frames, not thousands. Progress<T> marshals the
// callback onto the component's renderer dispatcher, so StateHasChanged is safe here.
var lastPercent = -1;
var progress = new Progress<long>(written =>
{
row.UploadedBytes = written;
if (row.UploadPercent != lastPercent)
{
lastPercent = row.UploadPercent;
StateHasChanged();
}
});
var result = await CmsTrackService.UploadTrackAsync(
wavStream,
row.WavFile.Size,
row.WavFile.Name,
row.WavFile.ContentType,
row.TrackName,
@@ -350,7 +375,8 @@
createdByUserId,
_releaseType,
trackNumber,
_medium);
_medium,
progress);
if (!result.Success || result.Value is null)
{
+15 -2
View File
@@ -44,8 +44,10 @@ builder.Services.AddHttpClient("DeepDrft.Content", client =>
client.BaseAddress = new Uri(contentApiUrl);
});
// Named HttpClient for ApiKey-protected Content API calls (CmsTrackService's vault delete).
// API key baked into the default request headers so callers need not add it manually.
// Named HttpClient for ApiKey-protected Content API calls (CmsTrackService's non-upload operations:
// delete, paged list, metadata read/write, waveform jobs, releases, genres).
// Timeout left at the default 100s — these are short request/response pairs and an infinite timeout
// would hang an InteractiveServer circuit forever on a dead connection.
var contentApiKey = builder.Configuration["Api:ContentApiKey"]
?? throw new InvalidOperationException("Api:ContentApiKey is required");
builder.Services.AddHttpClient("DeepDrft.Content.Cms", client =>
@@ -54,6 +56,17 @@ builder.Services.AddHttpClient("DeepDrft.Content.Cms", client =>
client.DefaultRequestHeaders.Add("ApiKey", contentApiKey);
});
// Dedicated upload client — inherits the API key but removes the whole-request timeout.
// Large WAV uploads (several hundred MB) outrun the 100s default. The upload path enforces an
// idle/heartbeat deadline instead (body-streaming phase via ProgressStreamContent) plus a separate
// response-wait budget (CmsTrackService), so the client itself must not impose a total cap.
builder.Services.AddHttpClient("DeepDrft.Content.Cms.Upload", client =>
{
client.BaseAddress = new Uri(contentApiUrl);
client.DefaultRequestHeaders.Add("ApiKey", contentApiKey);
client.Timeout = Timeout.InfiniteTimeSpan;
});
// Reverse-proxy support (nginx in production).
builder.Services.Configure<ForwardedHeadersOptions>(options =>
{
+86 -3
View File
@@ -18,21 +18,43 @@ namespace DeepDrftManager.Services;
public class CmsTrackService : ICmsTrackService
{
private const string ContentCmsClientName = "DeepDrft.Content.Cms";
private const string UploadClientName = "DeepDrft.Content.Cms.Upload";
private const string UploadPath = "api/track/upload";
// Idle/heartbeat window: abort an upload only after this long with zero bytes written to the wire.
// The window resets on every progress tick, so a slow-but-moving half-gig upload never trips it;
// a genuinely stalled socket does. Governs the BODY-STREAMING phase only.
// Operator-tunable via Upload:IdleTimeoutSeconds.
private const int DefaultIdleTimeoutSeconds = 90;
// Response-wait budget: once the request body is fully on the wire the server runs AudioProcessor
// decode → vault write → SQL persist. For a several-hundred-MB WAV this can take many minutes.
// The idle heartbeat goes silent after the last byte, so a separate, larger deadline governs the
// response-wait phase so a fully-uploaded file is never killed mid-persist.
// Operator-tunable via Upload:ResponseTimeoutSeconds.
private const int DefaultResponseTimeoutSeconds = 600; // 10 minutes
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<CmsTrackService> _logger;
private readonly TimeSpan _uploadIdleTimeout;
private readonly TimeSpan _uploadResponseTimeout;
public CmsTrackService(
IHttpClientFactory httpClientFactory,
IConfiguration configuration,
ILogger<CmsTrackService> logger)
{
_httpClientFactory = httpClientFactory;
_logger = logger;
var idleSeconds = configuration.GetValue<int?>("Upload:IdleTimeoutSeconds") ?? DefaultIdleTimeoutSeconds;
_uploadIdleTimeout = TimeSpan.FromSeconds(idleSeconds > 0 ? idleSeconds : DefaultIdleTimeoutSeconds);
var responseSeconds = configuration.GetValue<int?>("Upload:ResponseTimeoutSeconds") ?? DefaultResponseTimeoutSeconds;
_uploadResponseTimeout = TimeSpan.FromSeconds(responseSeconds > 0 ? responseSeconds : DefaultResponseTimeoutSeconds);
}
public async Task<ResultContainer<TrackDto>> UploadTrackAsync(
Stream wavStream,
long contentLength,
string fileName,
string contentType,
string trackName,
@@ -46,12 +68,55 @@ public class CmsTrackService : ICmsTrackService
ReleaseType releaseType,
int trackNumber,
ReleaseMedium medium = ReleaseMedium.Cut,
IProgress<long>? progress = null,
CancellationToken ct = default)
{
// Two-phase cancellation for the upload send:
//
// BODY-STREAMING phase (while bytes are on the wire):
// idleCts fires if no progress tick arrives within the idle window. Each
// ProgressStreamContent chunk resets CancelAfter(idle), so a slow-but-moving
// upload never trips it; a genuinely stalled socket does.
//
// RESPONSE-WAIT phase (after the last byte, while the server persists):
// The idle heartbeat goes silent once the body is fully sent. responseCts is
// armed at that moment with a larger budget so a fully-uploaded file is never
// killed mid-persist. idleCts is simultaneously disarmed (CancelAfter(Infinite))
// so it cannot misfire during the response-wait.
//
// sendCts links both so either deadline — plus the caller's ct — cancels the send.
using var idleCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
idleCts.CancelAfter(_uploadIdleTimeout);
// responseCts starts disarmed; the body-complete callback below arms it.
using var responseCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
// Umbrella token passed to SendAsync — either phase token (or the caller) can cancel.
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
// Rebuild the multipart container so the boundary is owned by HttpClient and the
// caller-supplied stream (already buffered by the SignalR upload) is the source.
using var multipart = new MultipartFormDataContent();
var wavContent = new StreamContent(wavStream);
var wavContent = new ProgressStreamContent(
wavStream,
contentLength,
written =>
{
// One mechanism, three consumers: advance the UI meter, reset the idle heartbeat,
// and on body-complete transition to the response-wait budget.
progress?.Report(written);
if (written < contentLength)
{
// Body still in flight — keep the idle heartbeat alive.
idleCts.CancelAfter(_uploadIdleTimeout);
}
else
{
// Last byte on the wire. Disarm the idle timer and start the response budget.
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
responseCts.CancelAfter(_uploadResponseTimeout);
}
});
wavContent.Headers.ContentType = new MediaTypeHeaderValue(
string.IsNullOrWhiteSpace(contentType) ? "audio/wav" : contentType);
multipart.Add(wavContent, "audioFile", fileName);
@@ -70,13 +135,31 @@ public class CmsTrackService : ICmsTrackService
// for an unrecognised value). Authoritative only when this upload creates the release.
multipart.Add(new StringContent(medium.ToString()), "medium");
var client = _httpClientFactory.CreateClient(ContentCmsClientName);
// Use the dedicated upload client (InfiniteTimeSpan) so the two-phase CTS logic above is the
// sole timeout authority. Non-upload operations use the bounded "DeepDrft.Content.Cms" client.
var client = _httpClientFactory.CreateClient(UploadClientName);
using var request = new HttpRequestMessage(HttpMethod.Post, UploadPath) { Content = multipart };
HttpResponseMessage response;
try
{
response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ct);
response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, sendCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Either idle window (body-streaming stall) or response-wait budget (server persist too slow).
if (idleCts.IsCancellationRequested)
{
_logger.LogWarning("Upload of {TrackName} stalled — no progress for {IdleSeconds}s; aborting.",
trackName, _uploadIdleTimeout.TotalSeconds);
return ResultContainer<TrackDto>.CreateFailResult(
$"Upload stalled — no data transferred for {_uploadIdleTimeout.TotalSeconds:0}s. Please retry.");
}
// responseCts fired: body reached the server but persist timed out.
_logger.LogWarning("Upload of {TrackName} timed out waiting for server response after {ResponseSeconds}s.",
trackName, _uploadResponseTimeout.TotalSeconds);
return ResultContainer<TrackDto>.CreateFailResult(
$"Upload timed out waiting for the server to respond after {_uploadResponseTimeout.TotalSeconds:0}s. Please retry.");
}
catch (Exception ex)
{
@@ -21,9 +21,14 @@ public interface ICmsTrackService
/// <paramref name="medium"/> sets the parent release's <see cref="ReleaseMedium"/> when this upload
/// creates the release. The medium is authoritative only on creation — adding a track to an existing
/// release never changes its medium (that is the edit path, <see cref="UpdateAsync"/>).
/// <paramref name="contentLength"/> is the total payload size (the browser file's <c>Size</c>); it
/// sets Content-Length and is the denominator for <paramref name="progress"/>, which reports cumulative
/// bytes pushed to the wire. Each progress tick also resets the idle/heartbeat upload timeout, so a
/// stalled connection aborts without a fixed total-duration cap.
/// </summary>
Task<ResultContainer<TrackDto>> UploadTrackAsync(
Stream wavStream,
long contentLength,
string fileName,
string contentType,
string trackName,
@@ -37,6 +42,7 @@ public interface ICmsTrackService
ReleaseType releaseType,
int trackNumber,
ReleaseMedium medium = ReleaseMedium.Cut,
IProgress<long>? progress = null,
CancellationToken ct = default);
/// <summary>
@@ -0,0 +1,64 @@
using System.Net;
namespace DeepDrftManager.Services;
/// <summary>
/// An <see cref="HttpContent"/> that streams a source stream to the wire while reporting cumulative
/// bytes written after each chunk. This is the single source of truth for both the upload progress
/// meter and the idle/heartbeat timeout: every reported tick advances the UI <em>and</em> resets the
/// idle deadline, so one mechanism feeds both concerns.
/// </summary>
/// <remarks>
/// Wrap the audio payload (not the whole multipart container) so <see cref="TryComputeLength"/>
/// returns the file length and the reported byte counts map directly onto "bytes of this file".
/// </remarks>
public sealed class ProgressStreamContent : HttpContent
{
// 80 KB: large enough to keep the socket fed on a healthy link, small enough that a stalled
// connection trips the idle window without a multi-MB write swallowing the whole heartbeat budget.
private const int CopyBufferSize = 81_920;
private readonly Stream _source;
private readonly long _length;
private readonly Action<long> _onBytesWritten;
/// <param name="source">The payload stream. Read once, sequentially — not seekable-rewound.</param>
/// <param name="length">Total bytes the source will yield; sets Content-Length and the meter denominator.</param>
/// <param name="onBytesWritten">Invoked after each chunk with the cumulative bytes written so far.</param>
public ProgressStreamContent(Stream source, long length, Action<long> onBytesWritten)
{
_source = source;
_length = length;
_onBytesWritten = onBytesWritten;
}
// Token-aware overload (.NET 5+): HttpClient calls this on the send path and passes the request's
// CancellationToken, so the idle-heartbeat CTS aborts an in-flight read/write promptly — not just
// between chunks. The parameterless base override delegates here with CancellationToken.None.
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken)
=> CopyAsync(stream, cancellationToken);
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context)
=> CopyAsync(stream, CancellationToken.None);
private async Task CopyAsync(Stream stream, CancellationToken cancellationToken)
{
var buffer = new byte[CopyBufferSize];
long written = 0;
int read;
while ((read = await _source.ReadAsync(buffer, cancellationToken)) > 0)
{
await stream.WriteAsync(buffer.AsMemory(0, read), cancellationToken);
written += read;
// Report after the bytes are on the wire — a tick means real forward progress, which is
// exactly the signal the idle heartbeat must reset on.
_onBytesWritten(written);
}
}
protected override bool TryComputeLength(out long length)
{
length = _length;
return true;
}
}
+3
View File
@@ -30,6 +30,9 @@
<ItemGroup>
<ProjectReference Include="..\DeepDrftContent\DeepDrftContent.csproj" />
<ProjectReference Include="..\DeepDrftData\DeepDrftData.csproj" />
<!-- Referenced for ProgressStreamContent (the upload progress/heartbeat HttpContent). It is plain
HttpContent with no browser/host dependency, unit-testable by serializing to a MemoryStream. -->
<ProjectReference Include="..\DeepDrftManager\DeepDrftManager.csproj" />
<!-- Referenced for the client-side queue orchestrator (QueueService / IQueueService).
The queue is pure domain logic, unit-testable against a fake IStreamingPlayerService
with no browser/JS. -->
+241
View File
@@ -0,0 +1,241 @@
using System.Net.Http;
using DeepDrftManager.Services;
namespace DeepDrftTests;
/// <summary>
/// Unit tests for <see cref="ProgressStreamContent"/> — the single mechanism feeding both the CMS
/// upload progress meter and the idle/heartbeat timeout. Two concerns are anchored here:
/// (1) progress reporting is monotonic and sums to the total content length, and
/// (2) the idle deadline pattern the content drives (reset CancelAfter on each tick) cancels a
/// stalled write yet never fires while writes are progressing.
/// </summary>
[TestFixture]
public class ProgressStreamContentTests
{
// --- Progress reporting ---
[Test]
public async Task ReportsMonotonicallyIncreasingBytes_SummingToContentLength()
{
var payload = new byte[256 * 1024 + 7]; // non-chunk-aligned so the final partial read is exercised
Random.Shared.NextBytes(payload);
var reports = new List<long>();
var content = new ProgressStreamContent(
new MemoryStream(payload), payload.Length, written => reports.Add(written));
using var sink = new MemoryStream();
await content.CopyToAsync(sink);
Assert.That(reports, Is.Not.Empty, "Expected at least one progress tick.");
Assert.That(reports, Is.Ordered.Ascending, "Progress must be monotonically increasing.");
Assert.That(reports[^1], Is.EqualTo(payload.Length), "Final tick must equal total content length.");
Assert.That(sink.ToArray(), Is.EqualTo(payload), "Serialized bytes must match the source payload.");
}
[Test]
public void TryComputeLength_ReturnsProvidedContentLength()
{
const long declared = 987_654;
var content = new ProgressStreamContent(new MemoryStream(), declared, _ => { });
Assert.That(content.Headers.ContentLength, Is.EqualTo(declared));
}
// --- Idle/heartbeat cancellation ---
// The content does not own a timer; it owns the progress signal. The upload service wires that
// signal to CancellationTokenSource.CancelAfter(idle). These tests exercise that exact contract by
// driving the content with a stream whose read cadence we control.
[Test]
public async Task IdleTimeout_DoesNotFire_WhileWritesAreProgressing()
{
var idle = TimeSpan.FromMilliseconds(200);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
// Five chunks, each arriving well within the idle window: progress keeps resetting the deadline.
var source = new PacedStream(chunkCount: 5, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(50));
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
using var sink = new MemoryStream();
await content.CopyToAsync(sink);
Assert.That(idleCts.IsCancellationRequested, Is.False,
"A steadily progressing upload must not trip the idle heartbeat.");
Assert.That(sink.Length, Is.EqualTo(source.TotalLength));
}
[Test]
public void IdleTimeout_Fires_WhenAStallExceedsTheIdleWindow()
{
var idle = TimeSpan.FromMilliseconds(150);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
// One quick chunk, then a stall longer than the idle window before the next read returns.
var source = new PacedStream(
chunkCount: 2, chunkSize: 4096,
delayPerChunk: TimeSpan.FromMilliseconds(10),
stallBeforeChunkIndex: 1, stallDuration: TimeSpan.FromMilliseconds(500));
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
using var sink = new MemoryStream();
Assert.That(
async () => await content.CopyToAsync(sink, idleCts.Token),
Throws.InstanceOf<OperationCanceledException>(),
"A stall exceeding the idle window must cancel the in-flight copy.");
Assert.That(idleCts.IsCancellationRequested, Is.True);
}
// --- Two-phase deadline switching (Finding 1 regression test) ---
// After the last body byte is reported the idle timer must be disarmed and the response-wait
// budget must be armed. This test simulates the exact scenario that triggered Finding 1:
// the body streams quickly, then a long server-side lag (standing in for AudioProcessor +
// vault write + SQL persist) follows. The idle window is short; the response budget is long.
// With the fix the operation must complete; without it idleCts would fire during the lag.
[Test]
public async Task PostBodyLag_DoesNotTriggerIdleTimeout_WhenResponseBudgetIsLarger()
{
var idle = TimeSpan.FromMilliseconds(150);
var responseBudget = TimeSpan.FromMilliseconds(600);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
using var responseCts = new CancellationTokenSource();
// responseCts starts disarmed — same as in CmsTrackService.
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
const long contentLength = 4096;
var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10));
var content = new ProgressStreamContent(source, contentLength, written =>
{
if (written < contentLength)
{
idleCts.CancelAfter(idle);
}
else
{
// Body complete — disarm idle, arm response budget (mirrors CmsTrackService).
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
responseCts.CancelAfter(responseBudget);
}
});
using var sink = new MemoryStream();
await content.CopyToAsync(sink, sendCts.Token);
// Body is done. Simulate a slow server (longer than idle window, shorter than response budget).
var serverLag = TimeSpan.FromMilliseconds(300); // > idle (150 ms), < response budget (600 ms)
await Task.Delay(serverLag, sendCts.Token);
Assert.That(sendCts.IsCancellationRequested, Is.False,
"A post-body server lag within the response budget must not cancel the send token.");
Assert.That(idleCts.IsCancellationRequested, Is.False,
"The idle CTS must be disarmed after body completes.");
Assert.That(responseCts.IsCancellationRequested, Is.False,
"The response CTS must not have fired — server lag was within the response budget.");
}
[Test]
public async Task PostBodyLag_CancelsViaResponseCts_WhenResponseBudgetExceeded()
{
var idle = TimeSpan.FromMilliseconds(200);
var responseBudget = TimeSpan.FromMilliseconds(150);
using var idleCts = new CancellationTokenSource();
idleCts.CancelAfter(idle);
using var responseCts = new CancellationTokenSource();
using var sendCts = CancellationTokenSource.CreateLinkedTokenSource(idleCts.Token, responseCts.Token);
const long contentLength = 4096;
var source = new PacedStream(chunkCount: 1, chunkSize: (int)contentLength, delayPerChunk: TimeSpan.FromMilliseconds(10));
var content = new ProgressStreamContent(source, contentLength, written =>
{
if (written < contentLength)
{
idleCts.CancelAfter(idle);
}
else
{
idleCts.CancelAfter(Timeout.InfiniteTimeSpan);
responseCts.CancelAfter(responseBudget);
}
});
using var sink = new MemoryStream();
await content.CopyToAsync(sink, sendCts.Token);
// Simulate a slow server that exceeds the response budget.
var serverLag = TimeSpan.FromMilliseconds(400); // > response budget (150 ms)
Assert.That(
async () => await Task.Delay(serverLag, sendCts.Token),
Throws.InstanceOf<OperationCanceledException>(),
"A post-body lag exceeding the response budget must cancel via sendCts.");
Assert.That(responseCts.IsCancellationRequested, Is.True,
"responseCts must be the source of the cancellation, not idleCts.");
Assert.That(idleCts.IsCancellationRequested, Is.False,
"idleCts must remain disarmed — the response budget fired, not the idle window.");
}
/// <summary>
/// A read-only stream that yields a fixed number of equal chunks, pausing between reads to emulate
/// network pacing. Optionally inserts a longer stall before a given chunk to emulate a stalled link.
/// </summary>
private sealed class PacedStream : Stream
{
private readonly int _chunkCount;
private readonly int _chunkSize;
private readonly TimeSpan _delayPerChunk;
private readonly int _stallBeforeChunkIndex;
private readonly TimeSpan _stallDuration;
private int _chunksRead;
public PacedStream(int chunkCount, int chunkSize, TimeSpan delayPerChunk,
int stallBeforeChunkIndex = -1, TimeSpan stallDuration = default)
{
_chunkCount = chunkCount;
_chunkSize = chunkSize;
_delayPerChunk = delayPerChunk;
_stallBeforeChunkIndex = stallBeforeChunkIndex;
_stallDuration = stallDuration;
}
public long TotalLength => (long)_chunkCount * _chunkSize;
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (_chunksRead >= _chunkCount) return 0;
if (_chunksRead == _stallBeforeChunkIndex)
{
await Task.Delay(_stallDuration, cancellationToken);
}
else
{
await Task.Delay(_delayPerChunk, cancellationToken);
}
var count = Math.Min(_chunkSize, buffer.Length);
buffer.Span[..count].Clear();
_chunksRead++;
return count;
}
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotSupportedException("Async-only paced stream.");
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => TotalLength;
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}