Stream the waveform compute so large uploads no longer buffer the whole file (Wave 2 OOM)
This commit is contained in:
@@ -17,6 +17,10 @@ public class WaveformProfileService
|
||||
{
|
||||
private const string ProfileExtension = ".wfp";
|
||||
|
||||
/// <summary>Bounded read-buffer size for the streaming PCM pass — the only filesize-independent
|
||||
/// allocation on the streaming path (matches the store path's 80 KB copy buffer).</summary>
|
||||
private const int StreamReadBufferSize = 81920;
|
||||
|
||||
private readonly FileDb _fileDatabase;
|
||||
private readonly AudioProcessor _audioProcessor;
|
||||
private readonly ILoudnessAlgorithm _loudnessAlgorithm;
|
||||
@@ -117,6 +121,161 @@ public class WaveformProfileService
|
||||
return ComputeAndStoreAsync(wavBytes, entryKey, bucketCount, VaultConstants.TrackWaveforms);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Streaming counterpart of <see cref="ComputeAndStoreAsync"/>: computes and stores the fixed
|
||||
/// 512-bucket player-bar profile by reading the WAV from <paramref name="openWavStream"/> in bounded
|
||||
/// chunks, never materializing the whole file in a managed <c>byte[]</c>. Tri-state result matches
|
||||
/// the <c>RemoveResourceAsync</c> idiom so callers can map outcomes precisely: <c>null</c> = no audio
|
||||
/// stream available (the entry has no backing audio); <c>false</c> = audio present but no profile
|
||||
/// computable (non-WAV / float / padded) or the vault write failed; <c>true</c> = stored. Output is
|
||||
/// byte-identical to the whole-buffer path for the same WAV.
|
||||
/// </summary>
|
||||
public Task<bool?> ComputeAndStoreProfileStreamingAsync(
|
||||
Func<CancellationToken, Task<Stream?>> openWavStream,
|
||||
string entryKey,
|
||||
CancellationToken ct = default) =>
|
||||
RunStreamingAsync(
|
||||
openWavStream, entryKey,
|
||||
[(_options.BucketCount, VaultConstants.WaveformProfiles)], ct);
|
||||
|
||||
/// <summary>
|
||||
/// Streaming counterpart of <see cref="ComputeAndStoreHighResAsync"/>: computes and stores the
|
||||
/// duration-derived high-res datum (<see cref="VaultConstants.TrackWaveforms"/>) by streaming the WAV
|
||||
/// from <paramref name="openWavStream"/>. <paramref name="durationSeconds"/> drives the bucket count
|
||||
/// exactly as the whole-buffer path's <c>audio.Duration</c> did — pass the same vault-metadata
|
||||
/// duration to keep the stored bytes identical. Tri-state result as in
|
||||
/// <see cref="ComputeAndStoreProfileStreamingAsync"/>.
|
||||
/// </summary>
|
||||
public Task<bool?> ComputeAndStoreHighResStreamingAsync(
|
||||
Func<CancellationToken, Task<Stream?>> openWavStream,
|
||||
string entryKey,
|
||||
double durationSeconds,
|
||||
CancellationToken ct = default) =>
|
||||
RunStreamingAsync(
|
||||
openWavStream, entryKey,
|
||||
[(WaveformResolution.BucketCountForDuration(durationSeconds), VaultConstants.TrackWaveforms)], ct);
|
||||
|
||||
/// <summary>
|
||||
/// Computes and stores BOTH datums a track carries — the 512-bucket profile and the duration-derived
|
||||
/// high-res datum — from a SINGLE streaming pass over the WAV. One sequential read of the (possibly
|
||||
/// ~GB) audio feeds two independent accumulators, so memory stays O(bucket arrays + read buffer) and
|
||||
/// disk I/O is halved versus two separate passes. This is the upload / replace-audio hot path. Each
|
||||
/// datum's stored bytes are byte-identical to its whole-buffer counterpart. Tri-state: <c>null</c> =
|
||||
/// no audio stream; <c>false</c> = not WAV-decodable or a vault write failed; <c>true</c> = both
|
||||
/// datums stored. Best-effort callers ignore the result.
|
||||
/// </summary>
|
||||
public Task<bool?> ComputeAndStoreAllStreamingAsync(
|
||||
Func<CancellationToken, Task<Stream?>> openWavStream,
|
||||
string entryKey,
|
||||
double durationSeconds,
|
||||
CancellationToken ct = default) =>
|
||||
RunStreamingAsync(
|
||||
openWavStream, entryKey,
|
||||
[
|
||||
(_options.BucketCount, VaultConstants.WaveformProfiles),
|
||||
(WaveformResolution.BucketCountForDuration(durationSeconds), VaultConstants.TrackWaveforms),
|
||||
],
|
||||
ct);
|
||||
|
||||
/// <summary>
|
||||
/// Core streaming reduction: opens the WAV once, parses its header (bounded), then streams the PCM
|
||||
/// data region through one loudness accumulator per requested target, storing each datum. All
|
||||
/// targets are computed in the single pass. See the tri-state contract on the public wrappers.
|
||||
/// </summary>
|
||||
private async Task<bool?> RunStreamingAsync(
|
||||
Func<CancellationToken, Task<Stream?>> openWavStream,
|
||||
string entryKey,
|
||||
IReadOnlyList<(int BucketCount, string VaultName)> targets,
|
||||
CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var stream = await openWavStream(ct);
|
||||
if (stream is null)
|
||||
{
|
||||
// No backing audio for this entry — distinct from "present but undecodable".
|
||||
return null;
|
||||
}
|
||||
|
||||
var info = await _audioProcessor.TryReadPcmStreamInfoAsync(stream, stream.Length, ct);
|
||||
if (info is null)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Waveform profile not computed for {EntryKey}: WAV PCM could not be extracted (streaming).",
|
||||
entryKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
var v = info.Value;
|
||||
var accumulators = new ILoudnessAccumulator[targets.Count];
|
||||
for (var i = 0; i < targets.Count; i++)
|
||||
{
|
||||
accumulators[i] = _loudnessAlgorithm.CreateAccumulator(
|
||||
v.DataLength, v.Channels, v.SampleRate, v.BitsPerSample, targets[i].BucketCount);
|
||||
}
|
||||
|
||||
await StreamPcmThroughAsync(stream, v.DataStart, v.DataLength, accumulators, ct);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Streaming waveform compute for {EntryKey}: {DataLength} PCM bytes, {TargetCount} datum(s), " +
|
||||
"{BufferSize}B read buffer — no whole-file load.",
|
||||
entryKey, v.DataLength, targets.Count, StreamReadBufferSize);
|
||||
|
||||
var allStored = true;
|
||||
for (var i = 0; i < targets.Count; i++)
|
||||
{
|
||||
var profile = accumulators[i].Finish();
|
||||
var quantized = Quantize(profile);
|
||||
|
||||
await EnsureVaultAsync(targets[i].VaultName);
|
||||
var binary = new MediaBinary(new MediaBinaryParams(quantized, quantized.Length, ProfileExtension));
|
||||
var stored = await _fileDatabase.RegisterResourceAsync(targets[i].VaultName, entryKey, binary);
|
||||
if (!stored)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Waveform vault write failed for {EntryKey} in {VaultName}.", entryKey, targets[i].VaultName);
|
||||
allStored = false;
|
||||
}
|
||||
}
|
||||
|
||||
return allStored;
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
_logger.LogError(ex, "Streaming waveform computation failed for {EntryKey}.", entryKey);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Seeks to the PCM data region and streams exactly <paramref name="dataLength"/> bytes through each
|
||||
/// accumulator in bounded reads. The accumulators carry partial frames internally, so the read
|
||||
/// boundaries need not align to frames. Peak memory is one read buffer — independent of file size.
|
||||
/// </summary>
|
||||
private static async Task StreamPcmThroughAsync(
|
||||
Stream stream, long dataStart, long dataLength, ILoudnessAccumulator[] accumulators, CancellationToken ct)
|
||||
{
|
||||
stream.Seek(dataStart, SeekOrigin.Begin);
|
||||
|
||||
var buffer = new byte[StreamReadBufferSize];
|
||||
var remaining = dataLength;
|
||||
while (remaining > 0)
|
||||
{
|
||||
var want = (int)Math.Min(buffer.Length, remaining);
|
||||
var read = await stream.ReadAsync(buffer.AsMemory(0, want), ct);
|
||||
if (read == 0)
|
||||
break;
|
||||
|
||||
var span = buffer.AsSpan(0, read);
|
||||
foreach (var accumulator in accumulators)
|
||||
{
|
||||
accumulator.Add(span);
|
||||
}
|
||||
|
||||
remaining -= read;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the stored quantized profile bytes for a track from <paramref name="vaultName"/>
|
||||
/// (defaults to <see cref="VaultConstants.WaveformProfiles"/> when null), or null if no profile
|
||||
|
||||
Reference in New Issue
Block a user