From 9347f11ff0be8a4e2ce3d6236eeb9eae36e849fc Mon Sep 17 00:00:00 2001 From: daniel-c-harvey Date: Thu, 25 Jun 2026 21:49:11 -0400 Subject: [PATCH] Stream the waveform compute so large uploads no longer buffer the whole file (Wave 2 OOM) --- DeepDrftAPI/Controllers/ReleaseController.cs | 8 +- DeepDrftAPI/Controllers/TrackController.cs | 32 ++- DeepDrftAPI/Services/UnifiedReleaseService.cs | 21 +- DeepDrftAPI/Services/UnifiedTrackService.cs | 28 +- DeepDrftContent/Processors/AudioProcessor.cs | 116 ++++++++ .../Processors/ILoudnessAlgorithm.cs | 42 +++ .../Processors/RmsLoudnessAlgorithm.cs | 269 ++++++++++++------ .../Processors/WaveformProfileService.cs | 159 +++++++++++ DeepDrftContent/TrackContentService.cs | 39 +++ DeepDrftTests/WaveformStreamingParityTests.cs | Bin 0 -> 14480 bytes 10 files changed, 594 insertions(+), 120 deletions(-) create mode 100644 DeepDrftTests/WaveformStreamingParityTests.cs diff --git a/DeepDrftAPI/Controllers/ReleaseController.cs b/DeepDrftAPI/Controllers/ReleaseController.cs index caa887f..2e042cc 100644 --- a/DeepDrftAPI/Controllers/ReleaseController.cs +++ b/DeepDrftAPI/Controllers/ReleaseController.cs @@ -112,10 +112,10 @@ public class ReleaseController : ControllerBase } // POST api/release/{id}/mix/waveform ([ApiKeyAuthorize], no body) - // Server-side trigger: fetch the Mix's track audio from the vault, compute a duration-derived high-res - // waveform via ComputeAndStoreHighResAsync, store it in the track-waveforms vault, and set - // MixMetadata.WaveformEntryKey. 404 when the release is missing or has no stored audio; 500 on - // compute/storage failure. Declared before "{id:long}". + // Server-side trigger: stream the Mix's track audio from the vault, compute a duration-derived + // high-res waveform, store it in the track-waveforms vault, and set MixMetadata.WaveformEntryKey. + // 404 when the release is missing or has no stored audio; 500 on compute/storage failure. Declared + // before "{id:long}". [ApiKeyAuthorize] [HttpPost("{id:long}/mix/waveform")] public async Task GenerateMixWaveform(long id, CancellationToken ct = default) diff --git a/DeepDrftAPI/Controllers/TrackController.cs b/DeepDrftAPI/Controllers/TrackController.cs index a277cbb..89e7a9d 100644 --- a/DeepDrftAPI/Controllers/TrackController.cs +++ b/DeepDrftAPI/Controllers/TrackController.cs @@ -756,15 +756,18 @@ public class TrackController : ControllerBase [HttpPost("{trackId}/waveform")] public async Task GenerateWaveform(string trackId) { - var audio = await _trackContentService.GetAudioBinaryAsync(trackId); - if (audio is null) + // Streaming compute (Wave 2): the WAV is read from the vault in bounded chunks, never buffered + // whole. Tri-state: null = no vault audio (404), false = present but uncomputable / write failed + // (500), true = stored. + var stored = await _waveformProfileService.ComputeAndStoreProfileStreamingAsync( + _ => _trackContentService.OpenAudioStreamAsync(trackId), trackId, HttpContext.RequestAborted); + if (stored is null) { _logger.LogWarning("GenerateWaveform: no audio in vault for {TrackId}", trackId); return NotFound(); } - var stored = await _waveformProfileService.ComputeAndStoreAsync(audio.Buffer, trackId); - if (!stored) + if (stored is false) { _logger.LogError("GenerateWaveform: profile computation/storage failed for {TrackId}", trackId); return StatusCode(500, "Failed to generate waveform profile."); @@ -784,16 +787,27 @@ public class TrackController : ControllerBase [HttpPost("{trackId}/waveform/high-res")] public async Task GenerateHighResWaveform(string trackId) { - var audio = await _trackContentService.GetAudioBinaryAsync(trackId); - if (audio is null) + // The high-res bucket count is duration-derived. Read the duration from the vault index metadata + // (no body load); its absence means the track has no vault audio → 404. + var duration = await _trackContentService.GetAudioDurationAsync(trackId); + if (duration is null) { _logger.LogWarning("GenerateHighResWaveform: no audio in vault for {TrackId}", trackId); return NotFound(); } - var stored = await _waveformProfileService.ComputeAndStoreHighResAsync( - audio.Buffer, trackId, audio.Duration); - if (!stored) + // Streaming compute (Wave 2): bounded read of the vault WAV. Tri-state mapping as in + // GenerateWaveform — null (entry vanished between the metadata read and the compute) → 404. + var stored = await _waveformProfileService.ComputeAndStoreHighResStreamingAsync( + _ => _trackContentService.OpenAudioStreamAsync(trackId), trackId, duration.Value, + HttpContext.RequestAborted); + if (stored is null) + { + _logger.LogWarning("GenerateHighResWaveform: no audio in vault for {TrackId}", trackId); + return NotFound(); + } + + if (stored is false) { _logger.LogError("GenerateHighResWaveform: computation/storage failed for {TrackId}", trackId); return StatusCode(500, "Failed to generate high-res waveform datum."); diff --git a/DeepDrftAPI/Services/UnifiedReleaseService.cs b/DeepDrftAPI/Services/UnifiedReleaseService.cs index 93ff858..27d0796 100644 --- a/DeepDrftAPI/Services/UnifiedReleaseService.cs +++ b/DeepDrftAPI/Services/UnifiedReleaseService.cs @@ -143,8 +143,9 @@ public class UnifiedReleaseService return Result.CreateFailResult(MixHasNoTrackMessage); } - var audio = await _trackContentService.GetAudioBinaryAsync(entryKey); - if (audio is null) + // Duration from the vault index metadata (no body load); its absence means no vault audio. + var duration = await _trackContentService.GetAudioDurationAsync(entryKey); + if (duration is null) { _logger.LogWarning("TriggerMixWaveform: no audio in vault for {EntryKey} (release {ReleaseId})", entryKey, releaseId); return Result.CreateFailResult(MixTrackNoAudioMessage); @@ -152,10 +153,18 @@ public class UnifiedReleaseService // Duration-derived, constant-time-resolution capture (≈333 samples/sec) so long mixes are not // under-sampled by a fixed bucket count — see WaveformResolution / spec §F. Same per-track - // high-res datum every track now carries (phase-12 §5). - var computed = await _waveformProfileService.ComputeAndStoreHighResAsync( - audio.Buffer, entryKey, audio.Duration); - if (!computed) + // high-res datum every track now carries (phase-12 §5). Streamed from the vault in bounded + // chunks (Wave 2): a ~GB mix is never buffered whole. Tri-state — null = entry vanished after + // the metadata read; false = uncomputable / write failed. + var computed = await _waveformProfileService.ComputeAndStoreHighResStreamingAsync( + _ => _trackContentService.OpenAudioStreamAsync(entryKey), entryKey, duration.Value, ct); + if (computed is null) + { + _logger.LogWarning("TriggerMixWaveform: no audio in vault for {EntryKey} (release {ReleaseId})", entryKey, releaseId); + return Result.CreateFailResult(MixTrackNoAudioMessage); + } + + if (computed is false) { _logger.LogError("TriggerMixWaveform: waveform computation/storage failed for {EntryKey}", entryKey); return Result.CreateFailResult("Failed to compute the Mix waveform."); diff --git a/DeepDrftAPI/Services/UnifiedTrackService.cs b/DeepDrftAPI/Services/UnifiedTrackService.cs index 50547be..4023955 100644 --- a/DeepDrftAPI/Services/UnifiedTrackService.cs +++ b/DeepDrftAPI/Services/UnifiedTrackService.cs @@ -279,8 +279,8 @@ public class UnifiedTrackService // The old waveform no longer matches the new bytes. Regenerate both datums in place, keyed by // the same EntryKey (the re-run overwrites the stale data). The store path no longer hands back // a buffer, so the waveform compute re-reads the freshly stored audio from the vault — the same - // path the upload uses. That re-read is whole-file (Wave 2, still unbounded by design); the - // store itself is now streamed. Best-effort throughout: a datum failure never fails the replace. + // path the upload uses. That re-read is now a bounded streaming pass (Wave 2); neither the store + // nor the compute holds the whole file. Best-effort throughout: a datum failure never fails the replace. await TryStoreWaveformDatumsAsync(entryKey, ct); // Write the new duration to SQL. The vault bytes are already swapped, so this is the @@ -302,15 +302,16 @@ public class UnifiedTrackService // Compute and store both waveform datums for a freshly uploaded track: the fixed 512-bucket profile // the player-bar seeker consumes, and the duration-derived high-res datum the lava visualizer - // consumes (phase-12 §5 — every track now carries one, computed at upload). Both source the same - // audio: read it back from the vault once (the authoritative parsed duration + the stored buffer) - // rather than re-reading and re-parsing the temp file. Best-effort throughout — never fails upload. + // consumes (phase-12 §5 — every track now carries one, computed at upload). Both are reduced in a + // SINGLE streaming pass over the vault audio (Wave 2): the duration comes from the vault index + // metadata (no body load) and the PCM is streamed in bounded chunks through two accumulators, so a + // ~GB mix never lands its whole body in a managed byte[]. Best-effort throughout — never fails upload. private async Task TryStoreWaveformDatumsAsync(string entryKey, CancellationToken ct) { try { - var audio = await _contentTrackContentService.GetAudioBinaryAsync(entryKey); - if (audio is null) + var duration = await _contentTrackContentService.GetAudioDurationAsync(entryKey); + if (duration is null) { _logger.LogWarning( "Waveform datum step: no audio in vault for {EntryKey} immediately after store; skipping.", @@ -318,8 +319,8 @@ public class UnifiedTrackService return; } - await _waveformProfileService.ComputeAndStoreAsync(audio.Buffer, entryKey); - await _waveformProfileService.ComputeAndStoreHighResAsync(audio.Buffer, entryKey, audio.Duration); + await _waveformProfileService.ComputeAndStoreAllStreamingAsync( + _ => _contentTrackContentService.OpenAudioStreamAsync(entryKey), entryKey, duration.Value, ct); } catch (Exception ex) when (ex is not OperationCanceledException) { @@ -350,8 +351,11 @@ public class UnifiedTrackService { ct.ThrowIfCancellationRequested(); - var audio = await _contentTrackContentService.GetAudioBinaryAsync(track.EntryKey); - if (audio is null) + // Read the duration from the vault index metadata (no audio body load) — the same value the + // processor wrote at upload. Bounds this admin path too (Wave 2): a backfill over a catalogue + // of long mixes no longer pulls each whole file into memory just to read its runtime. + var duration = await _contentTrackContentService.GetAudioDurationAsync(track.EntryKey); + if (duration is null) { _logger.LogWarning("BackfillDurationsAsync: no vault audio for {EntryKey} (track {Id}); skipping.", track.EntryKey, track.Id); @@ -359,7 +363,7 @@ public class UnifiedTrackService continue; } - var write = await _sqlTrackService.UpdateDuration(track.Id, audio.Duration, ct); + var write = await _sqlTrackService.UpdateDuration(track.Id, duration.Value, ct); if (!write.Success) { var error = write.Messages.FirstOrDefault()?.Message ?? "Unknown error"; diff --git a/DeepDrftContent/Processors/AudioProcessor.cs b/DeepDrftContent/Processors/AudioProcessor.cs index 8619ebe..bf2d47f 100644 --- a/DeepDrftContent/Processors/AudioProcessor.cs +++ b/DeepDrftContent/Processors/AudioProcessor.cs @@ -288,6 +288,105 @@ public class AudioProcessor return new PcmData(pcm, metadata.Channels, metadata.SampleRate, metadata.BitsPerSample); } + /// + /// Reads only the WAV header region from (a bounded window, never the + /// audio body) and returns where the PCM data region begins, how long it is, and the format + /// parameters needed to decode it — the streaming counterpart of . The + /// data length is clamped against (the true backing-file size), + /// so the caller streams exactly the present PCM. Returns null for the same inputs + /// rejects — non-WAV bytes (mp3/flac), float, and padded-container + /// EXTENSIBLE — so the caller treats null as "no profile computable" and continues gracefully. + /// + /// must be positioned at the start; on return its position is past the + /// header window (the caller seeks to DataStart before streaming the body). No whole-file + /// buffer is allocated — peak memory is the bounded header window. + /// + public async Task TryReadPcmStreamInfoAsync( + Stream stream, long totalFileLength, CancellationToken cancellationToken = default) + { + var window = await ReadWavHeaderWindowAsync(stream, cancellationToken); + if (window is null) + { + return null; + } + + var validation = ValidateWavStructure(window); + if (!validation.IsValid || validation.IsFloat) + { + return null; + } + + WavMetadata metadata; + try + { + metadata = ParseWavMetadata(window, validation); + ValidateAudioParameters(metadata); + if (metadata.IsPaddedContainer) + { + return null; + } + } + catch + { + return null; + } + + long dataStart = validation.DataChunkPos + 8; + if (dataStart > totalFileLength) + { + return null; + } + + var available = totalFileLength - dataStart; + var dataLength = Math.Min((long)metadata.DataSize, available); + if (dataLength <= 0) + { + return null; + } + + return new WavPcmStreamInfo( + dataStart, dataLength, metadata.Channels, metadata.SampleRate, metadata.BitsPerSample); + } + + /// + /// Reads enough of to contain the fmt chunk and the data chunk's 8-byte + /// header, growing in 64 KB steps until the data chunk is locatable or EOF / the + /// is reached. Bails after the first read when the bytes are not a + /// RIFF/WAVE container, so a non-WAV stream (mp3/flac) costs one read, not the full cap. Returns + /// null only when nothing could be read. + /// + private static async Task ReadWavHeaderWindowAsync(Stream stream, CancellationToken ct) + { + using var ms = new MemoryStream(); + var buffer = new byte[HeaderWindowStep]; + while (ms.Length < HeaderWindowCap) + { + var read = await stream.ReadAsync(buffer, ct); + if (read == 0) + break; + ms.Write(buffer, 0, read); + + var soFar = ms.ToArray(); + + // Early-out for non-WAV input: once at least the 12-byte RIFF/WAVE preamble is in hand, + // a missing signature means this will never be a WAV — stop rather than read to the cap. + if (soFar.Length >= 12 && !HasRiffWaveSignature(soFar)) + return soFar; + + // FindChunk returns -1 until the data chunk header is fully in the window; on a normal + // file it sits within the first 64 KB so this loop runs exactly once. + if (FindChunk(soFar, "data") >= 0) + return soFar; + } + + return ms.Length > 0 ? ms.ToArray() : null; + } + + private static bool HasRiffWaveSignature(byte[] buffer) => + buffer.Length >= 12 + && System.Text.Encoding.ASCII.GetString(buffer, 0, 4) == "RIFF" + && System.Text.Encoding.ASCII.GetString(buffer, 8, 4) == "WAVE"; + /// /// Extracts metadata from WAV file buffer with comprehensive validation /// @@ -698,4 +797,21 @@ public readonly record struct PcmData( ReadOnlyMemory Pcm, int Channels, int SampleRate, + int BitsPerSample); + +/// +/// Where a WAV's PCM data region lives and how to decode it, without the bytes themselves — the +/// streaming counterpart of . The caller seeks to and +/// streams exactly bytes through a loudness accumulator. +/// +/// Absolute byte offset of the first PCM sample (past the data chunk header). +/// PCM region length in bytes, clamped to what the backing file actually holds. +/// Number of interleaved channels. +/// Samples per second. +/// Bit depth per sample (8, 16, 24, or 32). +public readonly record struct WavPcmStreamInfo( + long DataStart, + long DataLength, + int Channels, + int SampleRate, int BitsPerSample); \ No newline at end of file diff --git a/DeepDrftContent/Processors/ILoudnessAlgorithm.cs b/DeepDrftContent/Processors/ILoudnessAlgorithm.cs index 7b2de43..66ecfb0 100644 --- a/DeepDrftContent/Processors/ILoudnessAlgorithm.cs +++ b/DeepDrftContent/Processors/ILoudnessAlgorithm.cs @@ -20,4 +20,46 @@ public interface ILoudnessAlgorithm /// is 1. All zeros when the signal is silent (peak is 0) or no samples are present. /// double[] Compute(ReadOnlySpan pcmData, int channels, int sampleRate, int bitsPerSample, int bucketCount); + + /// + /// Creates a stateful accumulator that reduces the same loudness profile from PCM fed in bounded + /// chunks rather than from one contiguous buffer. The streaming waveform path uses this so a long + /// track's PCM is never materialized whole in a managed byte[]. The accumulator's output is + /// byte-identical to for the same total PCM, because is + /// itself defined in terms of one — the single source of truth for the loudness reduction. + /// + /// + /// Total length of the PCM data region in bytes. Required up front because the bucket each frame + /// lands in is derived from the frame's position relative to the total frame count. + /// + /// Number of interleaved channels; averaged to mono per frame. + /// Samples per second (used for the envelope-smoothing time base). + /// Bit depth (8 unsigned, 16/24/32 signed) used to decode samples. + /// Number of equal time slices to reduce the signal to. + ILoudnessAccumulator CreateAccumulator( + long pcmByteLength, int channels, int sampleRate, int bitsPerSample, int bucketCount); +} + +/// +/// Stateful, single-pass reducer for one loudness profile. Frames are fed via in +/// arbitrary (non-frame-aligned) chunks — a partial frame straddling a chunk boundary is carried +/// internally — and emits the peak-normalized double[bucketCount]. Not +/// thread-safe; feed one stream sequentially. Reusable across the same stream's chunks only, not +/// across streams. +/// +public interface ILoudnessAccumulator +{ + /// + /// Feeds the next run of PCM bytes (interleaved, little-endian). Need not be frame-aligned; bytes + /// that do not complete a frame are retained until the next call. Bytes past the total frame count + /// declared at construction are ignored, matching the whole-buffer path's trailing-partial-frame drop. + /// + void Add(ReadOnlySpan pcmChunk); + + /// + /// Finalizes and returns the peak-normalized loudness profile (double[bucketCount], each in + /// [0, 1]). All zeros for silence or a degenerate (no-frame) input. Call once, after the last + /// . + /// + double[] Finish(); } diff --git a/DeepDrftContent/Processors/RmsLoudnessAlgorithm.cs b/DeepDrftContent/Processors/RmsLoudnessAlgorithm.cs index 6f4cb6e..bb3e056 100644 --- a/DeepDrftContent/Processors/RmsLoudnessAlgorithm.cs +++ b/DeepDrftContent/Processors/RmsLoudnessAlgorithm.cs @@ -18,100 +18,27 @@ public class RmsLoudnessAlgorithm : ILoudnessAlgorithm /// public const double SmoothingTimeConstantSeconds = 0.005; + /// + /// Whole-buffer reduction. Defined in terms of so the streaming and + /// whole-buffer paths share one decode + finalize implementation — byte-identical output by + /// construction, not by parallel maintenance. + /// public double[] Compute(ReadOnlySpan pcmData, int channels, int sampleRate, int bitsPerSample, int bucketCount) + { + var accumulator = CreateAccumulator(pcmData.Length, channels, sampleRate, bitsPerSample, bucketCount); + accumulator.Add(pcmData); + return accumulator.Finish(); + } + + public ILoudnessAccumulator CreateAccumulator( + long pcmByteLength, int channels, int sampleRate, int bitsPerSample, int bucketCount) { if (bucketCount <= 0) { throw new ArgumentOutOfRangeException(nameof(bucketCount), "Bucket count must be positive."); } - var result = new double[bucketCount]; - - if (channels <= 0) - { - return result; - } - - var bytesPerSample = bitsPerSample / 8; - if (bytesPerSample <= 0) - { - return result; - } - - var bytesPerFrame = bytesPerSample * channels; - var frameCount = pcmData.Length / bytesPerFrame; - if (frameCount == 0) - { - return result; - } - - // Sum of squared mono amplitudes and the frame count, per bucket. A frame's bucket is - // determined by its position in the timeline so buckets are equal-duration slices. - var sumSquares = new double[bucketCount]; - var counts = new long[bucketCount]; - - for (var frame = 0; frame < frameCount; frame++) - { - var frameStart = frame * bytesPerFrame; - - double channelSum = 0; - for (var ch = 0; ch < channels; ch++) - { - var sampleStart = frameStart + ch * bytesPerSample; - channelSum += ReadSampleNormalized(pcmData, sampleStart, bitsPerSample); - } - - var mono = channelSum / channels; - - // long math avoids overflow on large files before the divide back into bucket index. - var bucket = (int)((long)frame * bucketCount / frameCount); - if (bucket >= bucketCount) - { - bucket = bucketCount - 1; - } - - sumSquares[bucket] += mono * mono; - counts[bucket]++; - } - - for (var i = 0; i < bucketCount; i++) - { - if (counts[i] > 0) - { - result[i] = Math.Sqrt(sumSquares[i] / counts[i]); - } - } - - // Envelope smoothing (~15 ms): round the spikey per-bucket RMS into a smooth contour before - // peak-normalization, so the rendered ribbon reads as a continuous curve, not faceted polygons. - // Each bucket spans (totalSeconds / bucketCount) of audio; the filter coefficient is derived - // from that against the time constant so the smoothing is duration-aware, not a fixed window. - var totalSeconds = (double)frameCount / sampleRate; - var bucketSeconds = totalSeconds / bucketCount; - SmoothEnvelope(result, bucketSeconds); - - var peak = 0.0; - for (var i = 0; i < bucketCount; i++) - { - if (result[i] > peak) - { - peak = result[i]; - } - } - - if (peak <= 0) - { - // Silence — return all zeros (Array is already zero-initialized). - Array.Clear(result); - return result; - } - - for (var i = 0; i < bucketCount; i++) - { - result[i] /= peak; - } - - return result; + return new RmsLoudnessAccumulator(pcmByteLength, channels, sampleRate, bitsPerSample, bucketCount); } /// @@ -122,7 +49,7 @@ public class RmsLoudnessAlgorithm : ILoudnessAlgorithm /// each bucket blends (1 − a) of itself with a of the running envelope. A near-zero /// or non-finite bucket duration leaves the data untouched (nothing to smooth meaningfully). /// - private static void SmoothEnvelope(double[] data, double bucketSeconds) + internal static void SmoothEnvelope(double[] data, double bucketSeconds) { if (data.Length < 2 || bucketSeconds <= 0 || !double.IsFinite(bucketSeconds)) { @@ -154,7 +81,7 @@ public class RmsLoudnessAlgorithm : ILoudnessAlgorithm /// Decodes one PCM sample at to a normalized amplitude in [-1, 1]. /// 8-bit is unsigned (0..255, centered at 128); 16/24/32-bit are signed little-endian. /// - private static double ReadSampleNormalized(ReadOnlySpan data, int offset, int bitsPerSample) + internal static double ReadSampleNormalized(ReadOnlySpan data, int offset, int bitsPerSample) { switch (bitsPerSample) { @@ -194,3 +121,167 @@ public class RmsLoudnessAlgorithm : ILoudnessAlgorithm } } } + +/// +/// Single-pass RMS accumulator backing . Frames are fed via +/// in arbitrary chunks; a partial frame straddling a chunk boundary is carried in a +/// one-frame buffer. The per-frame decode, bucket assignment, and per-bucket accumulation are the exact +/// arithmetic the former whole-buffer loop used, in the same frame order, so the floating-point result +/// is bit-identical whether the PCM arrives in one span or many. applies the same +/// envelope smoothing and peak-normalization as before. Memory is O(bucketCount + one frame). +/// +public sealed class RmsLoudnessAccumulator : ILoudnessAccumulator +{ + private readonly int _channels; + private readonly int _sampleRate; + private readonly int _bitsPerSample; + private readonly int _bucketCount; + private readonly int _bytesPerSample; + private readonly int _bytesPerFrame; + private readonly long _frameCount; + + private readonly double[] _sumSquares; + private readonly long[] _counts; + private readonly byte[] _carry; + private int _carryLen; + private long _frameIndex; + + internal RmsLoudnessAccumulator(long pcmByteLength, int channels, int sampleRate, int bitsPerSample, int bucketCount) + { + _channels = channels; + _sampleRate = sampleRate; + _bitsPerSample = bitsPerSample; + _bucketCount = bucketCount; + _sumSquares = new double[bucketCount]; + _counts = new long[bucketCount]; + + // Guards mirror the former whole-buffer Compute exactly: any degenerate parameter leaves + // _frameCount at 0, so Add is a no-op and Finish returns the zero-initialized profile. + _bytesPerSample = bitsPerSample / 8; + if (channels <= 0 || _bytesPerSample <= 0) + { + _bytesPerFrame = 0; + _frameCount = 0; + _carry = []; + return; + } + + _bytesPerFrame = _bytesPerSample * channels; + _frameCount = pcmByteLength / _bytesPerFrame; + _carry = new byte[_bytesPerFrame]; + } + + public void Add(ReadOnlySpan pcmChunk) + { + if (_frameIndex >= _frameCount) + { + return; // degenerate input, or every expected frame already consumed + } + + var pos = 0; + + // Complete a frame carried from the previous chunk first. + if (_carryLen > 0) + { + var need = _bytesPerFrame - _carryLen; + var take = Math.Min(need, pcmChunk.Length); + pcmChunk.Slice(0, take).CopyTo(_carry.AsSpan(_carryLen)); + _carryLen += take; + pos += take; + + if (_carryLen < _bytesPerFrame) + { + return; // still not a full frame + } + + ProcessFrame(_carry); + _carryLen = 0; + if (_frameIndex >= _frameCount) + { + return; + } + } + + // Whole frames directly from the chunk. + while (pos + _bytesPerFrame <= pcmChunk.Length && _frameIndex < _frameCount) + { + ProcessFrame(pcmChunk.Slice(pos, _bytesPerFrame)); + pos += _bytesPerFrame; + } + + // Stash a trailing partial frame for the next chunk — but only while frames are still expected. + // A trailing partial frame on the final chunk is dropped, matching the whole-buffer path. + if (_frameIndex < _frameCount && pos < pcmChunk.Length) + { + var remainder = pcmChunk.Slice(pos); + remainder.CopyTo(_carry); + _carryLen = remainder.Length; + } + } + + private void ProcessFrame(ReadOnlySpan frame) + { + double channelSum = 0; + for (var ch = 0; ch < _channels; ch++) + { + channelSum += RmsLoudnessAlgorithm.ReadSampleNormalized(frame, ch * _bytesPerSample, _bitsPerSample); + } + + var mono = channelSum / _channels; + + // long math avoids overflow on large files before the divide back into bucket index. + var bucket = (int)(_frameIndex * _bucketCount / _frameCount); + if (bucket >= _bucketCount) + { + bucket = _bucketCount - 1; + } + + _sumSquares[bucket] += mono * mono; + _counts[bucket]++; + _frameIndex++; + } + + public double[] Finish() + { + var result = new double[_bucketCount]; + if (_frameCount == 0) + { + return result; // degenerate input — all zeros, as the whole-buffer guards returned + } + + for (var i = 0; i < _bucketCount; i++) + { + if (_counts[i] > 0) + { + result[i] = Math.Sqrt(_sumSquares[i] / _counts[i]); + } + } + + // Envelope smoothing (~15 ms) then peak-normalization — identical to the whole-buffer finalize. + var totalSeconds = (double)_frameCount / _sampleRate; + var bucketSeconds = totalSeconds / _bucketCount; + RmsLoudnessAlgorithm.SmoothEnvelope(result, bucketSeconds); + + var peak = 0.0; + for (var i = 0; i < _bucketCount; i++) + { + if (result[i] > peak) + { + peak = result[i]; + } + } + + if (peak <= 0) + { + Array.Clear(result); + return result; + } + + for (var i = 0; i < _bucketCount; i++) + { + result[i] /= peak; + } + + return result; + } +} diff --git a/DeepDrftContent/Processors/WaveformProfileService.cs b/DeepDrftContent/Processors/WaveformProfileService.cs index e4d9c24..8306f66 100644 --- a/DeepDrftContent/Processors/WaveformProfileService.cs +++ b/DeepDrftContent/Processors/WaveformProfileService.cs @@ -17,6 +17,10 @@ public class WaveformProfileService { private const string ProfileExtension = ".wfp"; + /// Bounded read-buffer size for the streaming PCM pass — the only filesize-independent + /// allocation on the streaming path (matches the store path's 80 KB copy buffer). + private const int StreamReadBufferSize = 81920; + private readonly FileDb _fileDatabase; private readonly AudioProcessor _audioProcessor; private readonly ILoudnessAlgorithm _loudnessAlgorithm; @@ -117,6 +121,161 @@ public class WaveformProfileService return ComputeAndStoreAsync(wavBytes, entryKey, bucketCount, VaultConstants.TrackWaveforms); } + /// + /// Streaming counterpart of : computes and stores the fixed + /// 512-bucket player-bar profile by reading the WAV from in bounded + /// chunks, never materializing the whole file in a managed byte[]. Tri-state result matches + /// the RemoveResourceAsync idiom so callers can map outcomes precisely: null = no audio + /// stream available (the entry has no backing audio); false = audio present but no profile + /// computable (non-WAV / float / padded) or the vault write failed; true = stored. Output is + /// byte-identical to the whole-buffer path for the same WAV. + /// + public Task ComputeAndStoreProfileStreamingAsync( + Func> openWavStream, + string entryKey, + CancellationToken ct = default) => + RunStreamingAsync( + openWavStream, entryKey, + [(_options.BucketCount, VaultConstants.WaveformProfiles)], ct); + + /// + /// Streaming counterpart of : computes and stores the + /// duration-derived high-res datum () by streaming the WAV + /// from . drives the bucket count + /// exactly as the whole-buffer path's audio.Duration did — pass the same vault-metadata + /// duration to keep the stored bytes identical. Tri-state result as in + /// . + /// + public Task ComputeAndStoreHighResStreamingAsync( + Func> openWavStream, + string entryKey, + double durationSeconds, + CancellationToken ct = default) => + RunStreamingAsync( + openWavStream, entryKey, + [(WaveformResolution.BucketCountForDuration(durationSeconds), VaultConstants.TrackWaveforms)], ct); + + /// + /// Computes and stores BOTH datums a track carries — the 512-bucket profile and the duration-derived + /// high-res datum — from a SINGLE streaming pass over the WAV. One sequential read of the (possibly + /// ~GB) audio feeds two independent accumulators, so memory stays O(bucket arrays + read buffer) and + /// disk I/O is halved versus two separate passes. This is the upload / replace-audio hot path. Each + /// datum's stored bytes are byte-identical to its whole-buffer counterpart. Tri-state: null = + /// no audio stream; false = not WAV-decodable or a vault write failed; true = both + /// datums stored. Best-effort callers ignore the result. + /// + public Task ComputeAndStoreAllStreamingAsync( + Func> openWavStream, + string entryKey, + double durationSeconds, + CancellationToken ct = default) => + RunStreamingAsync( + openWavStream, entryKey, + [ + (_options.BucketCount, VaultConstants.WaveformProfiles), + (WaveformResolution.BucketCountForDuration(durationSeconds), VaultConstants.TrackWaveforms), + ], + ct); + + /// + /// Core streaming reduction: opens the WAV once, parses its header (bounded), then streams the PCM + /// data region through one loudness accumulator per requested target, storing each datum. All + /// targets are computed in the single pass. See the tri-state contract on the public wrappers. + /// + private async Task RunStreamingAsync( + Func> openWavStream, + string entryKey, + IReadOnlyList<(int BucketCount, string VaultName)> targets, + CancellationToken ct) + { + try + { + await using var stream = await openWavStream(ct); + if (stream is null) + { + // No backing audio for this entry — distinct from "present but undecodable". + return null; + } + + var info = await _audioProcessor.TryReadPcmStreamInfoAsync(stream, stream.Length, ct); + if (info is null) + { + _logger.LogWarning( + "Waveform profile not computed for {EntryKey}: WAV PCM could not be extracted (streaming).", + entryKey); + return false; + } + + var v = info.Value; + var accumulators = new ILoudnessAccumulator[targets.Count]; + for (var i = 0; i < targets.Count; i++) + { + accumulators[i] = _loudnessAlgorithm.CreateAccumulator( + v.DataLength, v.Channels, v.SampleRate, v.BitsPerSample, targets[i].BucketCount); + } + + await StreamPcmThroughAsync(stream, v.DataStart, v.DataLength, accumulators, ct); + + _logger.LogInformation( + "Streaming waveform compute for {EntryKey}: {DataLength} PCM bytes, {TargetCount} datum(s), " + + "{BufferSize}B read buffer — no whole-file load.", + entryKey, v.DataLength, targets.Count, StreamReadBufferSize); + + var allStored = true; + for (var i = 0; i < targets.Count; i++) + { + var profile = accumulators[i].Finish(); + var quantized = Quantize(profile); + + await EnsureVaultAsync(targets[i].VaultName); + var binary = new MediaBinary(new MediaBinaryParams(quantized, quantized.Length, ProfileExtension)); + var stored = await _fileDatabase.RegisterResourceAsync(targets[i].VaultName, entryKey, binary); + if (!stored) + { + _logger.LogWarning( + "Waveform vault write failed for {EntryKey} in {VaultName}.", entryKey, targets[i].VaultName); + allStored = false; + } + } + + return allStored; + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogError(ex, "Streaming waveform computation failed for {EntryKey}.", entryKey); + return false; + } + } + + /// + /// Seeks to the PCM data region and streams exactly bytes through each + /// accumulator in bounded reads. The accumulators carry partial frames internally, so the read + /// boundaries need not align to frames. Peak memory is one read buffer — independent of file size. + /// + private static async Task StreamPcmThroughAsync( + Stream stream, long dataStart, long dataLength, ILoudnessAccumulator[] accumulators, CancellationToken ct) + { + stream.Seek(dataStart, SeekOrigin.Begin); + + var buffer = new byte[StreamReadBufferSize]; + var remaining = dataLength; + while (remaining > 0) + { + var want = (int)Math.Min(buffer.Length, remaining); + var read = await stream.ReadAsync(buffer.AsMemory(0, want), ct); + if (read == 0) + break; + + var span = buffer.AsSpan(0, read); + foreach (var accumulator in accumulators) + { + accumulator.Add(span); + } + + remaining -= read; + } + } + /// /// Returns the stored quantized profile bytes for a track from /// (defaults to when null), or null if no profile diff --git a/DeepDrftContent/TrackContentService.cs b/DeepDrftContent/TrackContentService.cs index decd24e..b7894d3 100644 --- a/DeepDrftContent/TrackContentService.cs +++ b/DeepDrftContent/TrackContentService.cs @@ -201,6 +201,45 @@ public class TrackContentService return await _fileDatabase.LoadResourceAsync(VaultConstants.Tracks, trackId); } + /// + /// Opens a read-only, seekable stream over a track's vault audio, or null if the entry has no + /// backing file. The caller owns the stream and must dispose it. Unlike + /// this never buffers the whole file — it is the source for the streaming waveform compute. Follows + /// the FileDatabase swallow-and-return-null contract. + /// + /// Track ID (EntryKey) + public async Task OpenAudioStreamAsync(string trackId) + { + var vault = _fileDatabase.GetVault(VaultConstants.Tracks); + if (vault is null) + { + return null; + } + + var media = await vault.GetEntryStreamAsync(trackId); + return media?.Stream; + } + + /// + /// Reads a track's stored audio duration from the vault index metadata WITHOUT loading the audio + /// body — the cheap counterpart of GetAudioBinaryAsync(...).Duration. Returns null if the + /// entry is unknown or carries no audio metadata. The streaming high-res waveform path uses this to + /// derive the duration-based bucket count, matching the value the whole-buffer path read off + /// so the stored datum is byte-identical. + /// + /// Track ID (EntryKey) + public async Task GetAudioDurationAsync(string trackId) + { + var vault = _fileDatabase.GetVault(VaultConstants.Tracks); + if (vault is null) + { + return null; + } + + var metaData = await vault.GetEntryMetadata(trackId); + return metaData is AudioMetaData audio ? audio.Duration : null; + } + /// /// Checks if FileDatabase is available and tracks vault exists /// diff --git a/DeepDrftTests/WaveformStreamingParityTests.cs b/DeepDrftTests/WaveformStreamingParityTests.cs new file mode 100644 index 0000000000000000000000000000000000000000..19e0329ef178a96711d9746aafe5dbf36ec0165f GIT binary patch literal 14480 zcmdU0S#KN35za%N^eZ}g8rdXuSauvqRzO|50&Ll^m~{Zd5SlYhaiBSv=^4q~g@Zih z2L${?vTb*a+9NPR5#*D8K=qSWl9n3TtP zR;sKV;8T|}EA_GkZ;CurTIYpd@hT3BT<4Q=aQYEv=om)~Ugp==067?rby>);q=)Xv zg&SVaT5HbYM4gPq3(-Z8U&vC9rB;JWRouovXV84QzqXdiROy)vmES&6nrK?v+}sog zx=K@7%n$k7J9(?bj+n_JF6W}8bz+hiqMRxL+?7m;b$3{Wr>O|@bXJvWAV%~auM#EF zT$dtFXL&)a6SE?psiK_k(HRV*K4&J&sR+h-l|?ENI6cxmk*Ql{0Nzb=oHqgSBF=;q zsm$ax7LMnodjDZVq!l3PGA|TA(&GH&^kQ^=JbcM;%AC=j6)_0lv-;y|GQr^t*OLYI zhGlqGgg1(*U_4|Nc{-x1kaSJ`NEPud@S4Wg(|)0}h-6u% z%oMFG6D_AKMmZnGfbQg6i`%N7lH(oMDNlG;~N%Lmgu!#)&%}=;kUA6c_m@J zA}KzJ3#bnykJRuqot1NlKP`TLsmediKA4RL&}|+^f*%JP{M;N%s~cy%f$YLSq~kbK z0e|^Hl_QnT=>K4&C)OWAb-gEksN!gFq3-B2*cgm*kOb_4jg5VVO7MWT3L)V0!7;R> zRQ`M5FtLk2S!zZ~7AN^#cAp^k<{nKYB<7a}q!X2>Qh5M9TyR(w8WIi_E-JOLFFtt$ zLs^DXY~Oq&j=<)=n&5Iu+?A49HDi^6M3IJ0WSrnSwRp5?_gkFn>w-3g{(@mzNlC0x z6NY~KEjTjh9D1}ebr<+A#K`8#Uh55(ieVMSxwqxGyn8F&rut=GMH!6! zFuBfQ9j9q`oi*C_sscLLiGU@>uD}(yAzVNo$w#Q-ptHjgd^k8~PZWat=B8n}`}8M_ z{O>I?R=mIqhAvw|A@w{9#YpNK;aKfd>T3JhQCwb@s!;jW5sdA51d|YlG8yIX$R;1L z8NDY-x3UlsxhZ)k<5HMQ8k`k*IwW9Tq#6QFTVUYGZ((2ilB4nOIxFwwEmj>>aT2`= zQ?M#9mgy~2V^8evZf|Y%Lp#kC1UCjleViu=ECdX2TcXxsOsunB;KkT+j*Zvs$L?~4b(k$z zY5W%Zxpj+XZELBZ&w z(aTez^Qs6D=TPWD0(HjT>xFm*xT*+a4a1g4k*FQ0q&SdVCfM8vaLgCPTLTa2zpXaU zp#~4D&CYJEHoO0QwfS02m>HMCK_1^c$&r_asIN&HITT#1ARPs`XM)@DpRvy?nBN}} zHojHbYwZ7|`3)%*5>bv2NeLJE!YuxNu(dD1i9=E!#M#;e-8O6vA^AuKw76DTg+K}V z>Gry)xupi*)W8RER&E6B6~rSCm%=L#(>N+?xG9<=<37xyOG+y2y%^&T%n;JSIUsU# zBs9!6>9F@4-dnhX5gA&KVGgtB{3H<*`y6vHLU?`yr^fNEZlcLvl2D579laQ5FG~iV zr*8lOsueBH*jb3aPzNOzBCW+xY73|=kTEYu7cpb}GLW$Y4-Ux(K^dOiBlB&?dU>^$Sd$;J*dsr5jdO2qg~d&=QH zoT=wIz;xmTXuK*=KtK*@T=IAFbjvK%Q&=dEE@{&Ll#%ILmxfLG*2c-2pwH_kS-A!t z*0x!5xZ3ufSXsAMH*QVk_}+$XW#(Dq*5RLdx-tvD+CsjvRhA%V<->IagFHanPA;>i z_g9idwS4GWRr|P;l8E&M$%m;eHU(@kB+INWB>VUtq-m@8W-FLYD4*<=*!0CmZ$~yd zx5cxsPMTlJ;#$FdU%jO|=6Pn%J)TzCjlQ~|swOATloo&A)ET7yb$hTYUL8^Kjq)Wb zb>68Y0hVOF3P~HgBzAM?1ea-iC&w&po6pJMXQL-8#pV?=$C7XqJ(rXq;--?47Mo7rF|mD}hRHGH+@?SI;c!12FLt)Fz5p z$@=yZ+flE`UKcKMdcOUKovlvUiq2cV$ervELa5Qt5eWaKLNSnYUk(mT4WEQ=rsNGO z$XtboG+A*&(I0Wi>wv0sToGOcYeM!|Cdo177>xA~zyPraAs%cm!gXj6VO~;;0o81Z zcPu_Gx{C^lR0X-b$>4qAx~UtwiQzKKg2FqO6}qMOl{3SxJYHSo87vG%f%@tlijYcQ zP~DRF2D5{5d8W@~qQAd=xuj|gDk+>>yN1IaO7TilH$hT?)i!Csc052u*mMZqo}Zm< zLWEDz&&W#Re9<;4SO%J*@26RqN08IO@bdWle1N2i84;|XpFH{PZ?LQ{`>;TL;zoZa z=M?z%uXCmQQ&jZ&gdmzV=A)&jt4AQ+3;Fd?h@p-%EU>Mg#5byi+c&TwED*ZNQhtxr?H28!N*!4^~?&AKqZK9$sR*Iihkqea0$Q5yO zV!u&TrW-(0v7FmZv==Qvf+@4Uw{NJI-$~kqgj$y zC5nX|x&JaNMXk#VcsFXPxghcNgj)+&Cm5U1=qzqOW=I1C{cQ*|mYD!xV<4!Y#bqeXfvdk6fC8X& zwi$tXHa_h$Ng2@bQz+qJ2m(FkTY~F_U6?!hoQ!*LL>-R9P{LO=5Vv=(p6ott_uU*b z?9Fk2vL_G*J`y!QI7r%lX4_cQWhGm?EB^KOe}JD)`Xp%`Za&)lztXJ?p$03IvHS?8 zc~rzshp<}tD|RgFQ0#BDYr;Az2bp1SBBm090h0rB|#hd+w9b-UL+ z^6B5kp|It38~3&QygC0(gIZ1z@p8&Mz5c+$dECmkuTX92YLQnH5fd`~)7ZQfb1Q{rW zB*CS{PE_c06N({2Igbb+VR{1^_*Snx=*DTipbdw z5$?!dfnKa>5N2E;e;JdH4@jLfLUi=AS~FL+K9ktV6y_vBLVu*956Qq zh8=kD__4Vx5};qae9$*{u=#w1VWMZdPj{YH;MVrm_LDzs?e6S6X~3dA#FO{&2f85Q z5iW?aJ3PeTN#jzi-Jl(25}8m;wdHQBAKTbfFTJxXkKzoYC0rRQf)vBF?V5>+nbd3O ztefSltCe>H-WRmDG|DVgsabD|pxa?(@5wW8f(W+ET(orQ;Zy$8Zw{2)=39L}iz|eZURAV}ISQHYOhdJj#KR3igpSocmOeJ9y-4Mdr zngWk3n?%5sUAZ2J8PmLgA0ac(L)BKKR!>RC$ z&Wp%H%&U?=p@K6VD0u*nhLlr9R~+k6;r|_N5-UhvEvpTyOW2>6nn{FRIK<>9LT=0Jqu+m<>jaW54#r^rfC3ZftuC zl6*aX$u_SVjt@MmJ)*%Gn)bnEq8G5tezyQTeq3t=nca9I0^W^DrWOGk?XTbtFROPq zY`3L`guH+odRBAe!-NX?WTF-N7N14iP-iHsp%TtkJKI62ne1G*Zmi^!x)c>lrP!Vu3u+1Ks*_uN3HP%4YL04XIoqt|rzbhAx{HdPF?4ZZ zZFtOYM7)agt~a@kX$IR^t=g0h+>3p<#B#xNSd&|l@dYNa4S`4w7?xu^J$YTk_+1%l z$Miu1AJ&B9r(js_R@&FgDQ6>Uoj*vpXPrGN?w2u!O9mgtpJA8N0*;u{0TeYas~PBJ-<)t8cByON`0G=eB7%iS-OWTf$G!Acqlufp1@? hE#xEWMUL0*jE2M$iTsm1_paVm--2QDr?pRO{|2~k@1p