Fix large CMS upload timeout with idle heartbeat and add per-file progress meter
Replace the 100s default HttpClient timeout (set Timeout=Infinite) with an idle/heartbeat deadline driven by a ProgressStreamContent wrapper that reports bytes-on-the-wire. Each tick resets the idle window and advances a MudProgressLinear per upload row. Idle window is configurable via Upload:IdleTimeoutSeconds (default 90s).
This commit is contained in:
@@ -476,9 +476,26 @@
|
||||
{
|
||||
// New track — upload, then link cover art with a follow-up update (same
|
||||
// two-step pattern as BatchUpload; the upload endpoint takes no imagePath).
|
||||
await using var wavStream = row.WavFile!.OpenReadStream(MaxUploadBytes);
|
||||
row.UploadedBytes = 0;
|
||||
row.TotalBytes = row.WavFile!.Size;
|
||||
await using var wavStream = row.WavFile.OpenReadStream(MaxUploadBytes);
|
||||
|
||||
// Re-render only on whole-percent change so a large upload paints ~100 frames,
|
||||
// not thousands. Progress<T> marshals back onto the renderer dispatcher.
|
||||
var lastPercent = -1;
|
||||
var progress = new Progress<long>(written =>
|
||||
{
|
||||
row.UploadedBytes = written;
|
||||
if (row.UploadPercent != lastPercent)
|
||||
{
|
||||
lastPercent = row.UploadPercent;
|
||||
StateHasChanged();
|
||||
}
|
||||
});
|
||||
|
||||
var uploadResult = await CmsTrackService.UploadTrackAsync(
|
||||
wavStream,
|
||||
row.WavFile.Size,
|
||||
row.WavFile.Name,
|
||||
row.WavFile.ContentType,
|
||||
row.TrackName,
|
||||
@@ -491,7 +508,8 @@
|
||||
createdByUserId,
|
||||
_releaseType,
|
||||
trackNumber,
|
||||
_medium);
|
||||
_medium,
|
||||
progress);
|
||||
|
||||
if (!uploadResult.Success || uploadResult.Value is null)
|
||||
{
|
||||
|
||||
@@ -27,6 +27,17 @@ public class BatchRowModel
|
||||
public BatchRowStatus Status { get; set; } = BatchRowStatus.Queued;
|
||||
|
||||
public string? ErrorMessage { get; set; }
|
||||
|
||||
/// <summary>Bytes pushed to the wire so far for this row's in-flight upload. Reset per attempt.</summary>
|
||||
public long UploadedBytes { get; set; }
|
||||
|
||||
/// <summary>Total payload bytes for this row (the WAV file size), the progress denominator.</summary>
|
||||
public long TotalBytes { get; set; }
|
||||
|
||||
/// <summary>Upload completion as a 0–100 percent, or 0 when the total is unknown.</summary>
|
||||
public int UploadPercent => TotalBytes > 0
|
||||
? (int)Math.Clamp(UploadedBytes * 100 / TotalBytes, 0, 100)
|
||||
: 0;
|
||||
}
|
||||
|
||||
public enum BatchRowStatus { Queued, Uploading, Done, Failed }
|
||||
|
||||
@@ -41,6 +41,13 @@
|
||||
OnClick="@(() => OnRemove.InvokeAsync(index))"
|
||||
aria-label="Remove track" />
|
||||
</MudStack>
|
||||
@if (row.Status == BatchRowStatus.Uploading)
|
||||
{
|
||||
<MudProgressLinear Color="Color.Info"
|
||||
Value="@row.UploadPercent"
|
||||
Class="mx-2 mb-2"
|
||||
aria-label="@($"Uploading {row.TrackName}")" />
|
||||
}
|
||||
</div>
|
||||
}
|
||||
</MudList>
|
||||
|
||||
@@ -64,6 +64,12 @@
|
||||
{
|
||||
@* Track name is derived from the Release Name for Session/Mix — no separate input. *@
|
||||
<MudText Typo="Typo.caption">Selected: @(_tracks[0].WavFile?.Name ?? "—")</MudText>
|
||||
@if (_tracks[0].Status == BatchRowStatus.Uploading)
|
||||
{
|
||||
<MudProgressLinear Color="Color.Info"
|
||||
Value="@_tracks[0].UploadPercent"
|
||||
aria-label="Uploading track" />
|
||||
}
|
||||
}
|
||||
</MudStack>
|
||||
</MudPaper>
|
||||
@@ -330,14 +336,33 @@
|
||||
row.Status = BatchRowStatus.Uploading;
|
||||
StateHasChanged();
|
||||
|
||||
row.UploadedBytes = 0;
|
||||
row.TotalBytes = row.WavFile!.Size;
|
||||
|
||||
try
|
||||
{
|
||||
// OpenReadStream streams chunks from the browser via the SignalR circuit; the
|
||||
// service wraps it in StreamContent so the whole file is never materialised in
|
||||
// memory before DeepDrftAPI receives it.
|
||||
await using var wavStream = row.WavFile!.OpenReadStream(MaxUploadBytes);
|
||||
// service wraps it in ProgressStreamContent so the whole file is never materialised
|
||||
// in memory before DeepDrftAPI receives it, and reports bytes-on-the-wire back here.
|
||||
await using var wavStream = row.WavFile.OpenReadStream(MaxUploadBytes);
|
||||
|
||||
// Progress ticks fire ~once per 80 KB; re-render only when the whole-percent changes
|
||||
// so a half-gig upload paints ~100 frames, not thousands. Progress<T> marshals the
|
||||
// callback onto the component's renderer dispatcher, so StateHasChanged is safe here.
|
||||
var lastPercent = -1;
|
||||
var progress = new Progress<long>(written =>
|
||||
{
|
||||
row.UploadedBytes = written;
|
||||
if (row.UploadPercent != lastPercent)
|
||||
{
|
||||
lastPercent = row.UploadPercent;
|
||||
StateHasChanged();
|
||||
}
|
||||
});
|
||||
|
||||
var result = await CmsTrackService.UploadTrackAsync(
|
||||
wavStream,
|
||||
row.WavFile.Size,
|
||||
row.WavFile.Name,
|
||||
row.WavFile.ContentType,
|
||||
row.TrackName,
|
||||
@@ -350,7 +375,8 @@
|
||||
createdByUserId,
|
||||
_releaseType,
|
||||
trackNumber,
|
||||
_medium);
|
||||
_medium,
|
||||
progress);
|
||||
|
||||
if (!result.Success || result.Value is null)
|
||||
{
|
||||
|
||||
@@ -52,6 +52,10 @@ builder.Services.AddHttpClient("DeepDrft.Content.Cms", client =>
|
||||
{
|
||||
client.BaseAddress = new Uri(contentApiUrl);
|
||||
client.DefaultRequestHeaders.Add("ApiKey", contentApiKey);
|
||||
// Large mix uploads (several hundred MB) outrun the 100s default whole-request timeout. The send
|
||||
// path enforces an idle/heartbeat deadline instead (CmsTrackService), which can only express
|
||||
// "no bytes for N seconds" if the client itself does not impose a total cap — hence Infinite here.
|
||||
client.Timeout = Timeout.InfiniteTimeSpan;
|
||||
});
|
||||
|
||||
// Reverse-proxy support (nginx in production).
|
||||
|
||||
@@ -20,19 +20,29 @@ public class CmsTrackService : ICmsTrackService
|
||||
private const string ContentCmsClientName = "DeepDrft.Content.Cms";
|
||||
private const string UploadPath = "api/track/upload";
|
||||
|
||||
// Idle/heartbeat window: abort an upload only after this long with zero bytes written to the wire.
|
||||
// The window resets on every progress tick, so a slow-but-moving half-gig upload never trips it;
|
||||
// a genuinely stalled socket does. Operator-tunable via Upload:IdleTimeoutSeconds.
|
||||
private const int DefaultIdleTimeoutSeconds = 90;
|
||||
|
||||
private readonly IHttpClientFactory _httpClientFactory;
|
||||
private readonly ILogger<CmsTrackService> _logger;
|
||||
private readonly TimeSpan _uploadIdleTimeout;
|
||||
|
||||
public CmsTrackService(
|
||||
IHttpClientFactory httpClientFactory,
|
||||
IConfiguration configuration,
|
||||
ILogger<CmsTrackService> logger)
|
||||
{
|
||||
_httpClientFactory = httpClientFactory;
|
||||
_logger = logger;
|
||||
var seconds = configuration.GetValue<int?>("Upload:IdleTimeoutSeconds") ?? DefaultIdleTimeoutSeconds;
|
||||
_uploadIdleTimeout = TimeSpan.FromSeconds(seconds > 0 ? seconds : DefaultIdleTimeoutSeconds);
|
||||
}
|
||||
|
||||
public async Task<ResultContainer<TrackDto>> UploadTrackAsync(
|
||||
Stream wavStream,
|
||||
long contentLength,
|
||||
string fileName,
|
||||
string contentType,
|
||||
string trackName,
|
||||
@@ -46,12 +56,25 @@ public class CmsTrackService : ICmsTrackService
|
||||
ReleaseType releaseType,
|
||||
int trackNumber,
|
||||
ReleaseMedium medium = ReleaseMedium.Cut,
|
||||
IProgress<long>? progress = null,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
// Idle/heartbeat cancellation: HttpClient.Timeout is a whole-request cap and cannot express
|
||||
// "no bytes for N seconds", so the named client runs with InfiniteTimeSpan and the deadline
|
||||
// lives here. Each ProgressStreamContent tick resets CancelAfter(idle); a stalled socket lets
|
||||
// the window elapse and cancels the send. Linked to the caller's ct so a page cancel still wins.
|
||||
using var idleCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
idleCts.CancelAfter(_uploadIdleTimeout);
|
||||
|
||||
// Rebuild the multipart container so the boundary is owned by HttpClient and the
|
||||
// caller-supplied stream (already buffered by the SignalR upload) is the source.
|
||||
using var multipart = new MultipartFormDataContent();
|
||||
var wavContent = new StreamContent(wavStream);
|
||||
var wavContent = new ProgressStreamContent(wavStream, contentLength, written =>
|
||||
{
|
||||
// One mechanism, two consumers: advance the UI meter and reset the idle heartbeat.
|
||||
progress?.Report(written);
|
||||
idleCts.CancelAfter(_uploadIdleTimeout);
|
||||
});
|
||||
wavContent.Headers.ContentType = new MediaTypeHeaderValue(
|
||||
string.IsNullOrWhiteSpace(contentType) ? "audio/wav" : contentType);
|
||||
multipart.Add(wavContent, "audioFile", fileName);
|
||||
@@ -76,7 +99,15 @@ public class CmsTrackService : ICmsTrackService
|
||||
HttpResponseMessage response;
|
||||
try
|
||||
{
|
||||
response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, ct);
|
||||
response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, idleCts.Token);
|
||||
}
|
||||
catch (OperationCanceledException) when (idleCts.IsCancellationRequested && !ct.IsCancellationRequested)
|
||||
{
|
||||
// Idle window elapsed with no bytes moving — a stalled connection, not a caller cancel.
|
||||
_logger.LogWarning("Upload of {TrackName} stalled — no progress for {IdleSeconds}s; aborting.",
|
||||
trackName, _uploadIdleTimeout.TotalSeconds);
|
||||
return ResultContainer<TrackDto>.CreateFailResult(
|
||||
$"Upload stalled — no data transferred for {_uploadIdleTimeout.TotalSeconds:0}s. Please retry.");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
@@ -21,9 +21,14 @@ public interface ICmsTrackService
|
||||
/// <paramref name="medium"/> sets the parent release's <see cref="ReleaseMedium"/> when this upload
|
||||
/// creates the release. The medium is authoritative only on creation — adding a track to an existing
|
||||
/// release never changes its medium (that is the edit path, <see cref="UpdateAsync"/>).
|
||||
/// <paramref name="contentLength"/> is the total payload size (the browser file's <c>Size</c>); it
|
||||
/// sets Content-Length and is the denominator for <paramref name="progress"/>, which reports cumulative
|
||||
/// bytes pushed to the wire. Each progress tick also resets the idle/heartbeat upload timeout, so a
|
||||
/// stalled connection aborts without a fixed total-duration cap.
|
||||
/// </summary>
|
||||
Task<ResultContainer<TrackDto>> UploadTrackAsync(
|
||||
Stream wavStream,
|
||||
long contentLength,
|
||||
string fileName,
|
||||
string contentType,
|
||||
string trackName,
|
||||
@@ -37,6 +42,7 @@ public interface ICmsTrackService
|
||||
ReleaseType releaseType,
|
||||
int trackNumber,
|
||||
ReleaseMedium medium = ReleaseMedium.Cut,
|
||||
IProgress<long>? progress = null,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
using System.Net;
|
||||
|
||||
namespace DeepDrftManager.Services;
|
||||
|
||||
/// <summary>
|
||||
/// An <see cref="HttpContent"/> that streams a source stream to the wire while reporting cumulative
|
||||
/// bytes written after each chunk. This is the single source of truth for both the upload progress
|
||||
/// meter and the idle/heartbeat timeout: every reported tick advances the UI <em>and</em> resets the
|
||||
/// idle deadline, so one mechanism feeds both concerns.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Wrap the audio payload (not the whole multipart container) so <see cref="TryComputeLength"/>
|
||||
/// returns the file length and the reported byte counts map directly onto "bytes of this file".
|
||||
/// </remarks>
|
||||
public sealed class ProgressStreamContent : HttpContent
|
||||
{
|
||||
// 80 KB: large enough to keep the socket fed on a healthy link, small enough that a stalled
|
||||
// connection trips the idle window without a multi-MB write swallowing the whole heartbeat budget.
|
||||
private const int CopyBufferSize = 81_920;
|
||||
|
||||
private readonly Stream _source;
|
||||
private readonly long _length;
|
||||
private readonly Action<long> _onBytesWritten;
|
||||
|
||||
/// <param name="source">The payload stream. Read once, sequentially — not seekable-rewound.</param>
|
||||
/// <param name="length">Total bytes the source will yield; sets Content-Length and the meter denominator.</param>
|
||||
/// <param name="onBytesWritten">Invoked after each chunk with the cumulative bytes written so far.</param>
|
||||
public ProgressStreamContent(Stream source, long length, Action<long> onBytesWritten)
|
||||
{
|
||||
_source = source;
|
||||
_length = length;
|
||||
_onBytesWritten = onBytesWritten;
|
||||
}
|
||||
|
||||
// Token-aware overload (.NET 5+): HttpClient calls this on the send path and passes the request's
|
||||
// CancellationToken, so the idle-heartbeat CTS aborts an in-flight read/write promptly — not just
|
||||
// between chunks. The parameterless base override delegates here with CancellationToken.None.
|
||||
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken)
|
||||
=> CopyAsync(stream, cancellationToken);
|
||||
|
||||
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context)
|
||||
=> CopyAsync(stream, CancellationToken.None);
|
||||
|
||||
private async Task CopyAsync(Stream stream, CancellationToken cancellationToken)
|
||||
{
|
||||
var buffer = new byte[CopyBufferSize];
|
||||
long written = 0;
|
||||
int read;
|
||||
while ((read = await _source.ReadAsync(buffer, cancellationToken)) > 0)
|
||||
{
|
||||
await stream.WriteAsync(buffer.AsMemory(0, read), cancellationToken);
|
||||
written += read;
|
||||
// Report after the bytes are on the wire — a tick means real forward progress, which is
|
||||
// exactly the signal the idle heartbeat must reset on.
|
||||
_onBytesWritten(written);
|
||||
}
|
||||
}
|
||||
|
||||
protected override bool TryComputeLength(out long length)
|
||||
{
|
||||
length = _length;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,9 @@
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\DeepDrftContent\DeepDrftContent.csproj" />
|
||||
<ProjectReference Include="..\DeepDrftData\DeepDrftData.csproj" />
|
||||
<!-- Referenced for ProgressStreamContent (the upload progress/heartbeat HttpContent). It is plain
|
||||
HttpContent with no browser/host dependency, unit-testable by serializing to a MemoryStream. -->
|
||||
<ProjectReference Include="..\DeepDrftManager\DeepDrftManager.csproj" />
|
||||
<!-- Referenced for the client-side queue orchestrator (QueueService / IQueueService).
|
||||
The queue is pure domain logic, unit-testable against a fake IStreamingPlayerService
|
||||
with no browser/JS. -->
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
using System.Net.Http;
|
||||
using DeepDrftManager.Services;
|
||||
|
||||
namespace DeepDrftTests;
|
||||
|
||||
/// <summary>
|
||||
/// Unit tests for <see cref="ProgressStreamContent"/> — the single mechanism feeding both the CMS
|
||||
/// upload progress meter and the idle/heartbeat timeout. Two concerns are anchored here:
|
||||
/// (1) progress reporting is monotonic and sums to the total content length, and
|
||||
/// (2) the idle deadline pattern the content drives (reset CancelAfter on each tick) cancels a
|
||||
/// stalled write yet never fires while writes are progressing.
|
||||
/// </summary>
|
||||
[TestFixture]
|
||||
public class ProgressStreamContentTests
|
||||
{
|
||||
// --- Progress reporting ---
|
||||
|
||||
[Test]
|
||||
public async Task ReportsMonotonicallyIncreasingBytes_SummingToContentLength()
|
||||
{
|
||||
var payload = new byte[256 * 1024 + 7]; // non-chunk-aligned so the final partial read is exercised
|
||||
Random.Shared.NextBytes(payload);
|
||||
var reports = new List<long>();
|
||||
|
||||
var content = new ProgressStreamContent(
|
||||
new MemoryStream(payload), payload.Length, written => reports.Add(written));
|
||||
|
||||
using var sink = new MemoryStream();
|
||||
await content.CopyToAsync(sink);
|
||||
|
||||
Assert.That(reports, Is.Not.Empty, "Expected at least one progress tick.");
|
||||
Assert.That(reports, Is.Ordered.Ascending, "Progress must be monotonically increasing.");
|
||||
Assert.That(reports[^1], Is.EqualTo(payload.Length), "Final tick must equal total content length.");
|
||||
Assert.That(sink.ToArray(), Is.EqualTo(payload), "Serialized bytes must match the source payload.");
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void TryComputeLength_ReturnsProvidedContentLength()
|
||||
{
|
||||
const long declared = 987_654;
|
||||
var content = new ProgressStreamContent(new MemoryStream(), declared, _ => { });
|
||||
|
||||
Assert.That(content.Headers.ContentLength, Is.EqualTo(declared));
|
||||
}
|
||||
|
||||
// --- Idle/heartbeat cancellation ---
|
||||
// The content does not own a timer; it owns the progress signal. The upload service wires that
|
||||
// signal to CancellationTokenSource.CancelAfter(idle). These tests exercise that exact contract by
|
||||
// driving the content with a stream whose read cadence we control.
|
||||
|
||||
[Test]
|
||||
public async Task IdleTimeout_DoesNotFire_WhileWritesAreProgressing()
|
||||
{
|
||||
var idle = TimeSpan.FromMilliseconds(200);
|
||||
using var idleCts = new CancellationTokenSource();
|
||||
idleCts.CancelAfter(idle);
|
||||
|
||||
// Five chunks, each arriving well within the idle window: progress keeps resetting the deadline.
|
||||
var source = new PacedStream(chunkCount: 5, chunkSize: 4096, delayPerChunk: TimeSpan.FromMilliseconds(50));
|
||||
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
|
||||
|
||||
using var sink = new MemoryStream();
|
||||
await content.CopyToAsync(sink);
|
||||
|
||||
Assert.That(idleCts.IsCancellationRequested, Is.False,
|
||||
"A steadily progressing upload must not trip the idle heartbeat.");
|
||||
Assert.That(sink.Length, Is.EqualTo(source.TotalLength));
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void IdleTimeout_Fires_WhenAStallExceedsTheIdleWindow()
|
||||
{
|
||||
var idle = TimeSpan.FromMilliseconds(150);
|
||||
using var idleCts = new CancellationTokenSource();
|
||||
idleCts.CancelAfter(idle);
|
||||
|
||||
// One quick chunk, then a stall longer than the idle window before the next read returns.
|
||||
var source = new PacedStream(
|
||||
chunkCount: 2, chunkSize: 4096,
|
||||
delayPerChunk: TimeSpan.FromMilliseconds(10),
|
||||
stallBeforeChunkIndex: 1, stallDuration: TimeSpan.FromMilliseconds(500));
|
||||
var content = new ProgressStreamContent(source, source.TotalLength, _ => idleCts.CancelAfter(idle));
|
||||
|
||||
using var sink = new MemoryStream();
|
||||
|
||||
Assert.That(
|
||||
async () => await content.CopyToAsync(sink, idleCts.Token),
|
||||
Throws.InstanceOf<OperationCanceledException>(),
|
||||
"A stall exceeding the idle window must cancel the in-flight copy.");
|
||||
Assert.That(idleCts.IsCancellationRequested, Is.True);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A read-only stream that yields a fixed number of equal chunks, pausing between reads to emulate
|
||||
/// network pacing. Optionally inserts a longer stall before a given chunk to emulate a stalled link.
|
||||
/// </summary>
|
||||
private sealed class PacedStream : Stream
|
||||
{
|
||||
private readonly int _chunkCount;
|
||||
private readonly int _chunkSize;
|
||||
private readonly TimeSpan _delayPerChunk;
|
||||
private readonly int _stallBeforeChunkIndex;
|
||||
private readonly TimeSpan _stallDuration;
|
||||
private int _chunksRead;
|
||||
|
||||
public PacedStream(int chunkCount, int chunkSize, TimeSpan delayPerChunk,
|
||||
int stallBeforeChunkIndex = -1, TimeSpan stallDuration = default)
|
||||
{
|
||||
_chunkCount = chunkCount;
|
||||
_chunkSize = chunkSize;
|
||||
_delayPerChunk = delayPerChunk;
|
||||
_stallBeforeChunkIndex = stallBeforeChunkIndex;
|
||||
_stallDuration = stallDuration;
|
||||
}
|
||||
|
||||
public long TotalLength => (long)_chunkCount * _chunkSize;
|
||||
|
||||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_chunksRead >= _chunkCount) return 0;
|
||||
|
||||
if (_chunksRead == _stallBeforeChunkIndex)
|
||||
{
|
||||
await Task.Delay(_stallDuration, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
await Task.Delay(_delayPerChunk, cancellationToken);
|
||||
}
|
||||
|
||||
var count = Math.Min(_chunkSize, buffer.Length);
|
||||
buffer.Span[..count].Clear();
|
||||
_chunksRead++;
|
||||
return count;
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
=> throw new NotSupportedException("Async-only paced stream.");
|
||||
|
||||
public override bool CanRead => true;
|
||||
public override bool CanSeek => false;
|
||||
public override bool CanWrite => false;
|
||||
public override long Length => TotalLength;
|
||||
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
|
||||
public override void Flush() { }
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user