diff --git a/DeepDrftAPI/Services/UnifiedTrackService.cs b/DeepDrftAPI/Services/UnifiedTrackService.cs index 5c7ef52..50547be 100644 --- a/DeepDrftAPI/Services/UnifiedTrackService.cs +++ b/DeepDrftAPI/Services/UnifiedTrackService.cs @@ -138,7 +138,7 @@ public class UnifiedTrackService } var unpersisted = await _contentTrackContentService.AddTrackAsync( - tempFilePath, trackName, artist, album, genre, releaseDate, originalFileName: originalFileName); + tempFilePath, trackName, artist, album, genre, releaseDate, originalFileName: originalFileName, cancellationToken: ct); if (unpersisted is null) { @@ -269,31 +269,25 @@ public class UnifiedTrackService var entryKey = lookup.Value.EntryKey; - var newAudio = await _contentTrackContentService.ReplaceTrackAudioAsync(entryKey, tempFilePath); - if (newAudio is null) + var newDuration = await _contentTrackContentService.ReplaceTrackAudioAsync(entryKey, tempFilePath, ct); + if (newDuration is null) { _logger.LogWarning("ReplaceAudioAsync: content swap returned null for track {TrackId} ({EntryKey})", trackId, entryKey); return Result.CreateFailResult("Failed to process and store the replacement audio."); } - // The old waveform no longer matches the new bytes. Regenerate both datums in place; keyed - // by the same EntryKey, the re-run overwrites the stale data (proven re-runnable). The - // freshly stored buffer is the authoritative source — no re-read of the vault needed. - try - { - await _waveformProfileService.ComputeAndStoreAsync(newAudio.Buffer, entryKey); - await _waveformProfileService.ComputeAndStoreHighResAsync(newAudio.Buffer, entryKey, newAudio.Duration); - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - _logger.LogError(ex, "ReplaceAudioAsync: waveform regen failed for {EntryKey}; replace unaffected.", entryKey); - } + // The old waveform no longer matches the new bytes. Regenerate both datums in place, keyed by + // the same EntryKey (the re-run overwrites the stale data). The store path no longer hands back + // a buffer, so the waveform compute re-reads the freshly stored audio from the vault — the same + // path the upload uses. That re-read is whole-file (Wave 2, still unbounded by design); the + // store itself is now streamed. Best-effort throughout: a datum failure never fails the replace. + await TryStoreWaveformDatumsAsync(entryKey, ct); // Write the new duration to SQL. The vault bytes are already swapped, so this is the // authoritative metadata update for the replace. A failure here is surfaced (unlike the // best-effort waveform regen above) because a stale DurationSeconds silently corrupts // derived aggregates (e.g. MixRuntimeSeconds on the home stats endpoint). - var durationWrite = await _sqlTrackService.SetDuration(trackId, newAudio.Duration, ct); + var durationWrite = await _sqlTrackService.SetDuration(trackId, newDuration.Value, ct); if (!durationWrite.Success) { var error = durationWrite.Messages.FirstOrDefault()?.Message ?? "Unknown error"; diff --git a/DeepDrftContent/FileDatabase/Services/FileDatabase.cs b/DeepDrftContent/FileDatabase/Services/FileDatabase.cs index 1153947..0294734 100644 --- a/DeepDrftContent/FileDatabase/Services/FileDatabase.cs +++ b/DeepDrftContent/FileDatabase/Services/FileDatabase.cs @@ -178,6 +178,42 @@ public class FileDatabase : DirectoryIndexDirectory, IDisposable return false; } + /// + /// Registers a resource by streaming its bytes into the vault, without materializing the whole + /// file in a managed byte[] (the store-path OOM fix). The caller supplies the index + /// and a callback that emits bytes to + /// the backing stream. Swallows exceptions and returns false, matching + /// 's contract — callers check the bool. + /// + public async Task RegisterResourceStreamingAsync( + string vaultId, + string entryId, + MetaData metaData, + Func writeContent, + CancellationToken cancellationToken = default) + { + try + { + var directoryVault = _vaults.Get(vaultId); + if (directoryVault != null) + { + var written = await directoryVault.AddEntryStreamingAsync(entryId, metaData, writeContent, cancellationToken); + _logger.LogInformation( + "Streamed {Bytes} bytes into vault {VaultId} entry {EntryId} (no whole-file buffer).", + written, vaultId, entryId); + return true; + } + } + catch (Exception ex) + { + // Swallow and return false, matching RegisterResourceAsync. Logged (unlike the buffered + // path) because a streamed write failure can leave a partial backing file worth noticing. + _logger.LogError(ex, "RegisterResourceStreamingAsync failed for vault {VaultId} entry {EntryId}", vaultId, entryId); + } + + return false; + } + /// /// Removes a resource from a specific vault. Returns null if the vault does not exist, /// false if the entry was not found, true if the entry was removed. Distinguishing diff --git a/DeepDrftContent/FileDatabase/Services/MediaVault.cs b/DeepDrftContent/FileDatabase/Services/MediaVault.cs index 2ccf294..8e88b2c 100644 --- a/DeepDrftContent/FileDatabase/Services/MediaVault.cs +++ b/DeepDrftContent/FileDatabase/Services/MediaVault.cs @@ -56,6 +56,37 @@ public abstract class MediaVault : VaultIndexDirectory await FileUtils.PutFileAsync(mediaPath, buffer); } + /// + /// Streams an entry's bytes into the vault without ever materializing the whole file in memory: + /// records the supplied in the index, then invokes + /// to emit bytes directly to the backing . + /// The metadata is supplied by the caller (there is no in-memory to infer + /// it from) — the store path (upload / replace-audio) sources its bytes from a staging file, not a + /// buffer. Returns the number of bytes written, for the caller to log. + /// + /// Index-then-file ordering matches ; a mid-write failure therefore + /// leaves an index entry over a partial/missing file, the same exposure the buffered path has on + /// an I/O fault. The caller treats a thrown exception as a failed register. + /// + public async Task AddEntryStreamingAsync( + string entryId, + MetaData metaData, + Func writeContent, + CancellationToken cancellationToken = default) + { + var mediaPath = GetMediaPathFromEntryKey(entryId, metaData.Extension); + + await AddToIndexAsync(entryId, metaData); + + await using var fileStream = new FileStream( + mediaPath, FileMode.Create, FileAccess.Write, FileShare.None, + bufferSize: 81920, useAsync: true); + await writeContent(fileStream, cancellationToken); + await fileStream.FlushAsync(cancellationToken); + + return fileStream.Length; + } + /// /// Retrieves an entry from the vault (MediaVaultType inferred from T) /// diff --git a/DeepDrftContent/Processors/AudioProcessor.cs b/DeepDrftContent/Processors/AudioProcessor.cs index 7a0d919..8619ebe 100644 --- a/DeepDrftContent/Processors/AudioProcessor.cs +++ b/DeepDrftContent/Processors/AudioProcessor.cs @@ -7,12 +7,22 @@ namespace DeepDrftContent.Processors; /// public class AudioProcessor { + // Header parsing never needs the audio body. Read the file in 64 KB steps until the data-chunk + // header is locatable, capping the window so a pathological file with an enormous pre-data header + // cannot drive an unbounded allocation — such a file simply falls through to default metadata and + // passthrough storage, the same outcome as any unparseable WAV. + private const int HeaderWindowStep = 64 * 1024; + private const int HeaderWindowCap = 8 * 1024 * 1024; + /// - /// Processes a WAV file and creates an AudioBinary object + /// Processes a WAV file into a store plan: extracts metadata from a + /// bounded header window (never the whole file) and returns a streamed writer for the canonical + /// vault bytes. Standard PCM is stored verbatim (passthrough copy); EXTENSIBLE-PCM / IEEE-float / + /// padded-container WAVs are normalized to a plain 44-byte standard-PCM WAV, written progressively + /// so the vault only ever holds a format the streaming pipeline already handles. /// /// Path to the WAV file - /// AudioBinary object with metadata - public async Task ProcessWavFileAsync(string filePath) + public async Task ProcessWavFileAsync(string filePath, CancellationToken cancellationToken = default) { if (!File.Exists(filePath)) { @@ -26,30 +36,197 @@ public class AudioProcessor try { - var buffer = await File.ReadAllBytesAsync(filePath); - var wavInfo = ExtractWavMetadata(buffer); + var fileLength = new FileInfo(filePath).Length; + var window = await ReadWavHeaderWindowAsync(filePath, cancellationToken); + var wavInfo = ExtractWavMetadata(window); - // EXTENSIBLE-PCM is byte-compatible with standard PCM but carries a 40+ byte fmt chunk - // the streaming pipeline never expects. Normalize to a plain 44-byte PCM WAV at storage - // time so the vault only ever holds standard PCM and the client decode path stays unchanged. - var storedBuffer = wavInfo.IsExtensible ? NormalizeToStandardPcm(buffer, wavInfo) : buffer; + if (!wavInfo.IsExtensible) + { + // Standard PCM (or the default-fallback path, which reports IsExtensible = false): + // the source bytes are already a format the pipeline handles, so store them verbatim. + return ProcessedAudio.Passthrough(filePath, ".wav", wavInfo.Duration, wavInfo.Bitrate, fileLength); + } - var parameters = new AudioBinaryParams( - Buffer: storedBuffer, - Size: storedBuffer.Length, - Extension: ".wav", - Duration: wavInfo.Duration, - Bitrate: wavInfo.Bitrate - ); + // EXTENSIBLE → streamed normalization. The output data size is derivable from the source + // data size alone (no body read needed): verbatim keeps it, float drops 1 byte per sample + // (4→3), padded keeps only the valid bytes per container sample. + var dataStart = (long)wavInfo.DataChunkPos + 8; + var available = fileLength - dataStart; + var srcDataSize = Math.Min((long)wavInfo.DataSize, available); - return new AudioBinary(parameters); + NormalizeMode mode; + int outBitsPerSample; + long outDataSize; + int containerBytes = 0; + int validBytes = 0; + if (wavInfo.IsFloat) + { + mode = NormalizeMode.Float; + outBitsPerSample = 24; + outDataSize = (srcDataSize / 4) * 3; + } + else if (wavInfo.IsPaddedContainer) + { + mode = NormalizeMode.Padded; + outBitsPerSample = wavInfo.BitsPerSample; + containerBytes = wavInfo.ContainerBitsPerSample / 8; + validBytes = wavInfo.BitsPerSample / 8; + outDataSize = (srcDataSize / containerBytes) * validBytes; + } + else + { + mode = NormalizeMode.Verbatim; + outBitsPerSample = wavInfo.BitsPerSample; + outDataSize = srcDataSize; + } + + var channels = wavInfo.Channels; + var sampleRate = wavInfo.SampleRate; + + return new ProcessedAudio( + ".wav", wavInfo.Duration, wavInfo.Bitrate, 44 + outDataSize, + (destination, ct) => WriteNormalizedWavAsync( + filePath, dataStart, srcDataSize, channels, sampleRate, outBitsPerSample, + outDataSize, mode, containerBytes, validBytes, destination, ct)); } - catch (Exception ex) + catch (Exception ex) when (ex is not OperationCanceledException) { throw new InvalidOperationException($"Failed to process WAV file: {ex.Message}", ex); } } + /// + /// Reads only enough of the file to contain the fmt chunk and the data chunk's 8-byte header, so + /// metadata parsing never loads the (potentially ~GB) audio body. Grows the window in 64 KB steps + /// until the data chunk is locatable or EOF/ is hit. + /// + private static async Task ReadWavHeaderWindowAsync(string filePath, CancellationToken ct) + { + await using var fs = new FileStream( + filePath, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: HeaderWindowStep, useAsync: true); + + using var ms = new MemoryStream(); + var buffer = new byte[HeaderWindowStep]; + while (ms.Length < HeaderWindowCap) + { + var read = await fs.ReadAsync(buffer, ct); + if (read == 0) + break; + ms.Write(buffer, 0, read); + + // FindChunk returns -1 on a partial window (the data chunk isn't reachable yet), so keep + // reading until it is found or the cap/EOF is hit. On normal files the data chunk header + // sits within the first 64 KB, so this loop runs exactly once. + var soFar = ms.ToArray(); + if (FindChunk(soFar, "data") >= 0) + return soFar; + } + + return ms.ToArray(); + } + + /// + /// Writes a normalized standard-PCM WAV to : the 44-byte header + /// followed by the data region streamed from the source in bounded, sample-aligned chunks. No + /// whole-file buffer is ever held — peak memory is O(chunk), independent of duration. + /// + private async Task WriteNormalizedWavAsync( + string sourcePath, long dataStart, long srcDataSize, + int channels, int sampleRate, int outBitsPerSample, long outDataSize, + NormalizeMode mode, int containerBytes, int validBytes, + Stream destination, CancellationToken ct) + { + var header = BuildStandardPcmHeader(channels, sampleRate, outBitsPerSample, outDataSize); + await destination.WriteAsync(header, ct); + + await using var src = new FileStream( + sourcePath, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: 81920, useAsync: true); + src.Seek(dataStart, SeekOrigin.Begin); + + switch (mode) + { + case NormalizeMode.Verbatim: + await CopyBoundedAsync(src, destination, srcDataSize, ct); + break; + case NormalizeMode.Float: + // Each 4-byte float sample becomes 3 bytes of 24-bit PCM. + await TransformBoundedAsync(src, destination, srcDataSize, unit: 4, + transform: (buf, len) => ConvertFloatTo24BitPcm(buf, 0, len), ct); + break; + case NormalizeMode.Padded: + await TransformBoundedAsync(src, destination, srcDataSize, unit: containerBytes, + transform: (buf, len) => RepackPaddedContainer(buf, 0, len, containerBytes * 8, validBytes * 8), ct); + break; + } + } + + /// Bounded copy of exactly from src to dest. + private static async Task CopyBoundedAsync(Stream src, Stream dest, long totalBytes, CancellationToken ct) + { + var buffer = new byte[81920]; + var remaining = totalBytes; + while (remaining > 0) + { + var want = (int)Math.Min(buffer.Length, remaining); + var read = await src.ReadAsync(buffer.AsMemory(0, want), ct); + if (read == 0) + break; + await dest.WriteAsync(buffer.AsMemory(0, read), ct); + remaining -= read; + } + } + + /// + /// Streams of source data through in + /// sample-aligned chunks, writing each transformed chunk to . The read + /// buffer is a multiple of ; leftover bytes that do not complete a sample + /// are carried into the next read, and a final partial sample is dropped (matching the + /// whole-buffer transforms' integer-division behavior). + /// + private static async Task TransformBoundedAsync( + Stream src, Stream dest, long totalBytes, int unit, + Func transform, CancellationToken ct) + { + var bufLen = Math.Max(unit, (81920 / unit) * unit); + var buffer = new byte[bufLen]; + var remaining = totalBytes; + var carried = 0; + while (remaining > 0) + { + var want = (int)Math.Min(bufLen - carried, remaining); + if (want == 0) + break; + var read = await src.ReadAsync(buffer.AsMemory(carried, want), ct); + if (read == 0) + break; + remaining -= read; + + var filled = carried + read; + var whole = (filled / unit) * unit; + if (whole > 0) + { + var output = transform(buffer, whole); + await dest.WriteAsync(output, ct); + } + + carried = filled - whole; + if (carried > 0) + Array.Copy(buffer, whole, buffer, 0, carried); + } + } + + private enum NormalizeMode + { + /// Sample bytes already standard PCM (EXTENSIBLE-PCM, depth == container width). + Verbatim, + /// IEEE float samples converted to 24-bit PCM. + Float, + /// Padded container (e.g. 24-in-32) re-packed to the valid depth. + Padded + } + /// /// Extracts the raw PCM data region and format parameters from a WAV buffer, reusing the /// same chunk-walk and validation as metadata extraction. Returns null if the buffer is not @@ -317,50 +494,17 @@ public class AudioProcessor } /// - /// Rebuilds an EXTENSIBLE WAV as a canonical 44-byte-header standard PCM WAV (audioFormat = 1) - /// so the vault only ever holds a format the streaming pipeline already handles. Three source - /// shapes are normalized: - /// - /// EXTENSIBLE-PCM (depth == container): sample bytes are byte-identical to standard PCM and - /// copied verbatim; only the header is replaced. - /// IEEE float: 32-bit float samples are converted to 24-bit signed integer PCM. - /// Padded container (e.g. 24-in-32): the padding/sign-extension bytes are stripped, keeping - /// the lowest valid bytes per sample. - /// - /// The output header always reports the valid bit depth (). + /// Builds the canonical 44-byte standard-PCM WAV header (audioFormat = 1) for a normalized stream. + /// The body is written separately so no whole-file buffer is allocated; this only emits the header + /// the streaming pipeline expects, reporting the valid (post-normalization) bit depth. /// - private byte[] NormalizeToStandardPcm(byte[] buffer, WavMetadata metadata) + private static byte[] BuildStandardPcmHeader(int channels, int sampleRate, int outBitsPerSample, long dataSize) { - // Clamp the declared data size to what is actually present; some encoders overshoot. - var dataStart = metadata.DataChunkPos + 8; - var available = buffer.Length - dataStart; - var srcDataSize = Math.Min(metadata.DataSize, available); - - byte[] dataBytes; - int outBitsPerSample; - if (metadata.IsFloat) - { - dataBytes = ConvertFloatTo24BitPcm(buffer, dataStart, srcDataSize); - outBitsPerSample = 24; - } - else if (metadata.IsPaddedContainer) - { - dataBytes = RepackPaddedContainer(buffer, dataStart, srcDataSize, metadata.ContainerBitsPerSample, metadata.BitsPerSample); - outBitsPerSample = metadata.BitsPerSample; - } - else - { - dataBytes = new byte[srcDataSize]; - Array.Copy(buffer, dataStart, dataBytes, 0, srcDataSize); - outBitsPerSample = metadata.BitsPerSample; - } - - var dataSize = dataBytes.Length; const int headerSize = 44; - var result = new byte[headerSize + dataSize]; + var result = new byte[headerSize]; - var blockAlign = (ushort)(metadata.Channels * (outBitsPerSample / 8)); - var byteRate = (uint)(metadata.SampleRate * blockAlign); + var blockAlign = (ushort)(channels * (outBitsPerSample / 8)); + var byteRate = (uint)(sampleRate * blockAlign); // RIFF header System.Text.Encoding.ASCII.GetBytes("RIFF").CopyTo(result, 0); @@ -371,8 +515,8 @@ public class AudioProcessor System.Text.Encoding.ASCII.GetBytes("fmt ").CopyTo(result, 12); BitConverter.GetBytes((uint)16).CopyTo(result, 16); BitConverter.GetBytes((ushort)1).CopyTo(result, 20); // audioFormat = PCM - BitConverter.GetBytes((ushort)metadata.Channels).CopyTo(result, 22); - BitConverter.GetBytes((uint)metadata.SampleRate).CopyTo(result, 24); + BitConverter.GetBytes((ushort)channels).CopyTo(result, 22); + BitConverter.GetBytes((uint)sampleRate).CopyTo(result, 24); BitConverter.GetBytes(byteRate).CopyTo(result, 28); BitConverter.GetBytes(blockAlign).CopyTo(result, 32); BitConverter.GetBytes((ushort)outBitsPerSample).CopyTo(result, 34); @@ -381,8 +525,6 @@ public class AudioProcessor System.Text.Encoding.ASCII.GetBytes("data").CopyTo(result, 36); BitConverter.GetBytes((uint)dataSize).CopyTo(result, 40); - Array.Copy(dataBytes, 0, result, headerSize, dataSize); - return result; } @@ -459,7 +601,7 @@ public class AudioProcessor /// /// Finds a chunk in the WAV file buffer with proper alignment handling /// - private int FindChunk(byte[] buffer, string chunkId) + private static int FindChunk(byte[] buffer, string chunkId) { var chunkBytes = System.Text.Encoding.ASCII.GetBytes(chunkId); int offset = 12; // Start after RIFF header diff --git a/DeepDrftContent/Processors/AudioProcessorRouter.cs b/DeepDrftContent/Processors/AudioProcessorRouter.cs index e1242a7..2a7675d 100644 --- a/DeepDrftContent/Processors/AudioProcessorRouter.cs +++ b/DeepDrftContent/Processors/AudioProcessorRouter.cs @@ -24,18 +24,18 @@ public class AudioProcessorRouter } /// - /// Processes with the processor matching its extension, returning an - /// carrying the stored bytes and extracted metadata. Throws - /// for unsupported extensions. + /// Processes with the processor matching its extension, returning a + /// store plan (extracted metadata plus a streamed writer for the + /// canonical vault bytes). Throws for unsupported extensions. /// - public async Task ProcessAudioFileAsync(string filePath) + public async Task ProcessAudioFileAsync(string filePath, CancellationToken cancellationToken = default) { var ext = Path.GetExtension(filePath).ToLowerInvariant(); return ext switch { - ".wav" => await _wavProcessor.ProcessWavFileAsync(filePath), - ".mp3" => await _mp3Processor.ProcessMp3FileAsync(filePath), - ".flac" => await _flacProcessor.ProcessFlacFileAsync(filePath), + ".wav" => await _wavProcessor.ProcessWavFileAsync(filePath, cancellationToken), + ".mp3" => await _mp3Processor.ProcessMp3FileAsync(filePath, cancellationToken), + ".flac" => await _flacProcessor.ProcessFlacFileAsync(filePath, cancellationToken), _ => throw new ArgumentException($"Unsupported audio format: {ext}", nameof(filePath)), }; } diff --git a/DeepDrftContent/Processors/AudioStoreStream.cs b/DeepDrftContent/Processors/AudioStoreStream.cs new file mode 100644 index 0000000..af7006c --- /dev/null +++ b/DeepDrftContent/Processors/AudioStoreStream.cs @@ -0,0 +1,58 @@ +namespace DeepDrftContent.Processors; + +/// +/// Bounded-buffer streaming primitives shared by the audio processors on the store path. None of +/// these hold the whole file in memory: copies move a fixed window at a time, and the header read +/// caps its allocation regardless of file size. +/// +internal static class AudioStoreStream +{ + private const int CopyBufferSize = 81920; // 80 KB — matches the controller staging copy. + + /// + /// Bounded disk-to-disk copy of into . + /// Used for passthrough formats whose stored bytes equal the source bytes. Hand-rolled rather than + /// because FileStream's override writes in 128 KB + /// blocks; this keeps every write at or below , so peak managed memory + /// is provably O(buffer), never O(filesize). + /// + public static async Task CopyFileAsync(string sourcePath, Stream destination, CancellationToken ct) + { + await using var src = new FileStream( + sourcePath, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: CopyBufferSize, useAsync: true); + + var buffer = new byte[CopyBufferSize]; + int read; + while ((read = await src.ReadAsync(buffer, ct)) > 0) + { + await destination.WriteAsync(buffer.AsMemory(0, read), ct); + } + } + + /// + /// Reads at most bytes from the start of — enough + /// for header/metadata parsing without loading the (potentially ~GB) body. Bounds the allocation + /// at min(cap, fileLength). Size-based metadata (e.g. average bitrate) must use the true + /// file length, supplied separately, not the prefix length. + /// + public static async Task ReadPrefixAsync(string path, long cap, CancellationToken ct) + { + await using var fs = new FileStream( + path, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: CopyBufferSize, useAsync: true); + + var length = (int)Math.Min(cap, fs.Length); + var buffer = new byte[length]; + var total = 0; + while (total < length) + { + var read = await fs.ReadAsync(buffer.AsMemory(total, length - total), ct); + if (read == 0) + break; + total += read; + } + + return total == length ? buffer : buffer[..total]; + } +} diff --git a/DeepDrftContent/Processors/FlacAudioProcessor.cs b/DeepDrftContent/Processors/FlacAudioProcessor.cs index 23ffe71..154a97b 100644 --- a/DeepDrftContent/Processors/FlacAudioProcessor.cs +++ b/DeepDrftContent/Processors/FlacAudioProcessor.cs @@ -12,7 +12,11 @@ public class FlacAudioProcessor private const double FallbackDuration = 180.0; private const int FallbackBitrate = 1411; - public async Task ProcessFlacFileAsync(string filePath) + // STREAMINFO is mandatory and always the first metadata block, immediately after the 4-byte magic + // (data at offset 8, 34 bytes). A small prefix read covers it without loading the body. + private const long HeaderCap = 64 * 1024; + + public async Task ProcessFlacFileAsync(string filePath, CancellationToken cancellationToken = default) { if (!File.Exists(filePath)) { @@ -24,25 +28,21 @@ public class FlacAudioProcessor throw new ArgumentException("File must be a FLAC file", nameof(filePath)); } - var buffer = await File.ReadAllBytesAsync(filePath); - var meta = ExtractFlacMetadata(buffer); + var fileLength = new FileInfo(filePath).Length; + var window = await AudioStoreStream.ReadPrefixAsync(filePath, HeaderCap, cancellationToken); + var meta = ExtractFlacMetadata(window, fileLength); - var parameters = new AudioBinaryParams( - Buffer: buffer, - Size: buffer.Length, - Extension: ".flac", - Duration: meta.Duration, - Bitrate: meta.Bitrate); - - return new AudioBinary(parameters); + // FLAC is stored unmodified — passthrough the original bytes via a streamed disk-to-disk copy. + return ProcessedAudio.Passthrough(filePath, ".flac", meta.Duration, meta.Bitrate, fileLength); } /// /// Validates the fLaC magic and the leading STREAMINFO block, then computes duration from /// total-samples / sample-rate and average bitrate from file size. On any parse failure, logs a - /// warning and returns synthetic defaults — never throws. + /// warning and returns synthetic defaults — never throws. is the true + /// file size (the header window may be shorter), used for the average-bitrate computation. /// - private static FlacMetadata ExtractFlacMetadata(byte[] buffer) + private static FlacMetadata ExtractFlacMetadata(byte[] buffer, long fileLength) { try { @@ -84,7 +84,7 @@ public class FlacAudioProcessor var duration = (double)totalSamples / sampleRate; var bitrate = duration > 0 - ? (int)(buffer.LongLength * 8L / (duration * 1000)) + ? (int)(fileLength * 8L / (duration * 1000)) : FallbackBitrate; return new FlacMetadata { Duration = duration, Bitrate = bitrate }; diff --git a/DeepDrftContent/Processors/Mp3AudioProcessor.cs b/DeepDrftContent/Processors/Mp3AudioProcessor.cs index 398ea7d..ce91e44 100644 --- a/DeepDrftContent/Processors/Mp3AudioProcessor.cs +++ b/DeepDrftContent/Processors/Mp3AudioProcessor.cs @@ -25,7 +25,13 @@ public class Mp3AudioProcessor private const double FallbackDuration = 180.0; private const int FallbackBitrate = 320; - public async Task ProcessMp3FileAsync(string filePath) + // Metadata lives in the leading ID3v2 tag plus the first MPEG frame. Cap the header read so a + // large MP3 is not pulled into memory whole just to read it; a tag larger than this (very large + // embedded art) simply falls back to the CBR/default estimate, never an OOM. The body is stored + // by streaming the original file, not from this window. + private const long HeaderCap = 8 * 1024 * 1024; + + public async Task ProcessMp3FileAsync(string filePath, CancellationToken cancellationToken = default) { if (!File.Exists(filePath)) { @@ -37,24 +43,21 @@ public class Mp3AudioProcessor throw new ArgumentException("File must be an MP3 file", nameof(filePath)); } - var buffer = await File.ReadAllBytesAsync(filePath); - var meta = ExtractMp3Metadata(buffer); + var fileLength = new FileInfo(filePath).Length; + var window = await AudioStoreStream.ReadPrefixAsync(filePath, HeaderCap, cancellationToken); + var meta = ExtractMp3Metadata(window, fileLength); - var parameters = new AudioBinaryParams( - Buffer: buffer, - Size: buffer.Length, - Extension: ".mp3", - Duration: meta.Duration, - Bitrate: meta.Bitrate); - - return new AudioBinary(parameters); + // MP3 is stored unmodified — passthrough the original bytes via a streamed disk-to-disk copy. + return ProcessedAudio.Passthrough(filePath, ".mp3", meta.Duration, meta.Bitrate, fileLength); } /// /// Parses the first valid MPEG frame (after any ID3v2 tag) and any Xing/VBRI tag inside it. /// On any parse failure, logs a warning and returns synthetic defaults — never throws. + /// is the true file size (the header window may be shorter), used + /// for the CBR duration estimate. /// - private static Mp3Metadata ExtractMp3Metadata(byte[] buffer) + private static Mp3Metadata ExtractMp3Metadata(byte[] buffer, long fileLength) { try { @@ -65,7 +68,7 @@ public class Mp3AudioProcessor } var header = DecodeFrameHeader(buffer, frameStart); - var duration = ComputeDuration(buffer, frameStart, header); + var duration = ComputeDuration(buffer, frameStart, header, fileLength); return new Mp3Metadata { Duration = duration, Bitrate = header.BitrateKbps }; } @@ -202,7 +205,7 @@ public class Mp3AudioProcessor /// Computes duration from a Xing/Info or VBRI tag (accurate for VBR) when present; otherwise /// falls back to the CBR estimate fileSize / (bitrate_kbps * 125). Guards divide-by-zero. /// - private static double ComputeDuration(byte[] buffer, int frameStart, FrameHeader header) + private static double ComputeDuration(byte[] buffer, int frameStart, FrameHeader header, long fileLength) { var xingFrames = ReadXingFrameCount(buffer, frameStart, header); if (xingFrames > 0 && header.SampleRate > 0) @@ -216,10 +219,10 @@ public class Mp3AudioProcessor return (double)vbriFrames * header.SamplesPerFrame / header.SampleRate; } - // CBR fallback: bitrate_kbps * 1000 / 8 bytes per second = bitrate_kbps * 125. - // Exclude the ID3v2 tag bytes (everything before frameStart) from the estimate. + // CBR fallback: bitrate_kbps * 1000 / 8 bytes per second = bitrate_kbps * 125. Uses the true + // file length (not the bounded header window), excluding the ID3v2 tag bytes before frameStart. var bytesPerSecond = header.BitrateKbps * 125; - return bytesPerSecond > 0 ? (double)(buffer.Length - frameStart) / bytesPerSecond : FallbackDuration; + return bytesPerSecond > 0 ? (double)(fileLength - frameStart) / bytesPerSecond : FallbackDuration; } /// diff --git a/DeepDrftContent/Processors/ProcessedAudio.cs b/DeepDrftContent/Processors/ProcessedAudio.cs new file mode 100644 index 0000000..c6d8487 --- /dev/null +++ b/DeepDrftContent/Processors/ProcessedAudio.cs @@ -0,0 +1,68 @@ +namespace DeepDrftContent.Processors; + +/// +/// The product of processing an uploaded audio file on the store path: the metadata SQL and the +/// vault index need, plus a streamed writer that emits the canonical vault bytes to a destination +/// stream without ever materializing the whole file in a managed byte[]. +/// +/// This replaces the former whole-file AudioBinary as the processor output for upload / +/// replace-audio (Wave 1 OOM fix): passthrough formats (standard-PCM WAV, MP3, FLAC) stream the +/// source file straight to the destination, and EXTENSIBLE WAVs stream their normalization to +/// standard PCM. The vault load path still uses AudioBinary (a full buffer) — that +/// is the Wave 2 read path and is out of scope here. +/// +/// is invoked exactly once by the streaming vault register, against the +/// freshly opened backing . The writer re-opens the source file +/// itself, so the source (a staging file) must still exist when the register runs — it does, because +/// processing and registration are sequential within the store call, before the staging-file +/// finally cleanup. +/// +public sealed class ProcessedAudio +{ + /// The stored file extension (e.g. .wav, .mp3, .flac). + public string Extension { get; } + + /// Audio duration in seconds, extracted from the header. + public double Duration { get; } + + /// Audio bitrate in kbps, extracted from (or estimated for) the header. + public int Bitrate { get; } + + /// + /// The canonical stored byte count — computed from the header and file length, never by + /// buffering the body. Used only for diagnostics (confirming the streamed path was taken). + /// + public long Size { get; } + + private readonly Func _writeTo; + + public ProcessedAudio( + string extension, + double duration, + int bitrate, + long size, + Func writeTo) + { + Extension = extension; + Duration = duration; + Bitrate = bitrate; + Size = size; + _writeTo = writeTo; + } + + /// + /// Streams the canonical vault bytes to . Bounded-buffer — peak + /// managed memory is O(buffer), not O(filesize). + /// + public Task WriteToAsync(Stream destination, CancellationToken cancellationToken = default) + => _writeTo(destination, cancellationToken); + + /// + /// Builds a passthrough plan: the stored bytes are byte-identical to the source file (standard + /// PCM WAV, MP3, FLAC — no transcoding). The writer is a bounded disk-to-disk copy. + /// + public static ProcessedAudio Passthrough( + string sourcePath, string extension, double duration, int bitrate, long sourceLength) + => new(extension, duration, bitrate, sourceLength, + (destination, ct) => AudioStoreStream.CopyFileAsync(sourcePath, destination, ct)); +} diff --git a/DeepDrftContent/TrackContentService.cs b/DeepDrftContent/TrackContentService.cs index 208ba36..decd24e 100644 --- a/DeepDrftContent/TrackContentService.cs +++ b/DeepDrftContent/TrackContentService.cs @@ -40,13 +40,15 @@ public class TrackContentService string? album = null, string? genre = null, DateOnly? releaseDate = null, - string? originalFileName = null) + string? originalFileName = null, + CancellationToken cancellationToken = default) { try { - // Process the audio file (routed by extension) - var audioBinary = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath); - if (audioBinary == null) + // Process the audio file (routed by extension). The returned plan carries metadata plus a + // streamed writer — no whole-file buffer (the store-path OOM fix). + var processed = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath, cancellationToken); + if (processed == null) { throw new InvalidOperationException("Failed to process audio file"); } @@ -60,8 +62,11 @@ public class TrackContentService await _fileDatabase.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio); } - // Store the audio in FileDatabase - var success = await _fileDatabase.RegisterResourceAsync(VaultConstants.Tracks, trackId, audioBinary); + // Stream the audio into the vault. The metadata is supplied directly (there is no in-memory + // AudioBinary on this path), and the bytes are written progressively from the staging file. + var metaData = MetaDataFactory.CreateAudioMetaData(trackId, processed.Extension, processed.Duration, processed.Bitrate); + var success = await _fileDatabase.RegisterResourceStreamingAsync( + VaultConstants.Tracks, trackId, metaData, processed.WriteToAsync, cancellationToken); if (!success) { throw new InvalidOperationException("Failed to store audio in FileDatabase"); @@ -77,7 +82,7 @@ public class TrackContentService OriginalFileName = originalFileName, // Persist the processor-extracted runtime to SQL so aggregate queries (total mix runtime) // need not touch the vault. Same value the high-res waveform compute reads downstream. - DurationSeconds = audioBinary.Duration + DurationSeconds = processed.Duration }; return trackEntity; @@ -100,34 +105,37 @@ public class TrackContentService string? album = null, string? genre = null, DateOnly? releaseDate = null, - string? originalFileName = null) => - AddTrackAsync(wavFilePath, trackName, artist, album, genre, releaseDate, originalFileName); + string? originalFileName = null, + CancellationToken cancellationToken = default) => + AddTrackAsync(wavFilePath, trackName, artist, album, genre, releaseDate, originalFileName, cancellationToken); /// /// Swaps the audio bytes for an existing track in place: processes a new audio file and /// re-registers it under the SAME in the tracks vault. The track's /// vault key — and therefore its SQL link, release membership, position, and metadata — is - /// untouched; only the binary changes. The new audio is written first; only on confirmed success - /// is a stale old backing file cleaned up. A cross-format replacement (e.g. .wav → .flac) leaves - /// the old file on disk under its former filename once the index is updated; the post-success - /// cleanup removes it. For a same-extension overwrite the register alone suffices — the file is - /// written in place. If the register fails the original audio is left intact and null is returned, - /// so the track remains playable. Returns the freshly stored on success - /// (so the caller can regenerate waveform data from the same bytes) — matching the FileDatabase - /// swallow-and-return-null contract. + /// untouched; only the binary changes. The new audio is streamed to the vault first; only on + /// confirmed success is a stale old backing file cleaned up. A cross-format replacement (e.g. + /// .wav → .flac) leaves the old file on disk under its former filename once the index is updated; + /// the post-success cleanup removes it. For a same-extension overwrite the register alone suffices. + /// If the register fails the original audio is left intact and null is returned, so the track + /// remains playable. Returns the freshly stored audio's duration on success (the caller + /// re-reads the vault for waveform regen and uses this for the SQL duration write) — matching the + /// FileDatabase swallow-and-return-null contract. The new bytes are never materialized in memory. /// - public async Task ReplaceTrackAudioAsync(string entryKey, string audioFilePath) + public async Task ReplaceTrackAudioAsync(string entryKey, string audioFilePath, CancellationToken cancellationToken = default) { try { - // Capture the old extension before touching the vault. After register the index - // will point to the new extension, so we need the old value now to detect a - // cross-format swap and clean up the stale file post-success. - var existing = await _fileDatabase.LoadResourceAsync(VaultConstants.Tracks, entryKey); - var oldExtension = existing?.Extension; + // Capture the old extension from the index metadata (not by loading the file — that would + // pull the whole old audio into memory). After register the index points to the new + // extension, so we need the old value now to detect a cross-format swap and clean up the + // stale file post-success. + var trackVault = _fileDatabase.GetVault(VaultConstants.Tracks); + var existingMeta = trackVault is null ? null : await trackVault.GetEntryMetadata(entryKey); + var oldExtension = existingMeta?.Extension; - var audioBinary = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath); - if (audioBinary == null) + var processed = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath, cancellationToken); + if (processed == null) { Console.WriteLine($"TrackContentService.ReplaceTrackAudioAsync: processing returned null for {entryKey}"); return null; @@ -138,9 +146,11 @@ public class TrackContentService await _fileDatabase.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio); } - // Register the new audio. This upserts the index entry (new extension recorded) and - // writes the new file to disk. If this fails the original entry and file are untouched. - var success = await _fileDatabase.RegisterResourceAsync(VaultConstants.Tracks, entryKey, audioBinary); + // Stream the new audio in. This upserts the index entry (new extension recorded) and writes + // the new file to disk. If this fails the original entry and file are untouched. + var metaData = MetaDataFactory.CreateAudioMetaData(entryKey, processed.Extension, processed.Duration, processed.Bitrate); + var success = await _fileDatabase.RegisterResourceStreamingAsync( + VaultConstants.Tracks, entryKey, metaData, processed.WriteToAsync, cancellationToken); if (!success) { Console.WriteLine($"TrackContentService.ReplaceTrackAudioAsync: vault write failed for {entryKey}; original audio preserved"); @@ -153,7 +163,7 @@ public class TrackContentService // old path — RemoveResourceAsync would now resolve to the new extension and delete the // wrong file. Non-fatal: an orphaned old file is a disk-hygiene concern, not a // playback issue (the index no longer references it). - if (oldExtension != null && oldExtension != audioBinary.Extension) + if (oldExtension != null && oldExtension != processed.Extension) { var vault = _fileDatabase.GetVault(VaultConstants.Tracks); if (vault != null) @@ -172,7 +182,7 @@ public class TrackContentService } } - return audioBinary; + return processed.Duration; } catch (Exception ex) when (ex is not OperationCanceledException) { diff --git a/DeepDrftTests/AudioProcessorTests.cs b/DeepDrftTests/AudioProcessorTests.cs index edac1cc..eae9505 100644 --- a/DeepDrftTests/AudioProcessorTests.cs +++ b/DeepDrftTests/AudioProcessorTests.cs @@ -32,13 +32,18 @@ public class AudioProcessorTests [Test] public async Task StandardPcm_RoundTripsUnchanged() { - var path = await WriteWavAsync(BuildMinimalWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, audioFormat: WaveFormatPcm)); + var source = BuildMinimalWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, audioFormat: WaveFormatPcm); + var path = await WriteWavAsync(source); var audio = await new AudioProcessor().ProcessWavFileAsync(path); Assert.That(audio, Is.Not.Null); Assert.That(audio!.Duration, Is.GreaterThan(0.0)); Assert.That(audio.Bitrate, Is.GreaterThan(0)); + // Standard PCM is passthrough: the streamed bytes must be byte-identical to the source file. + var stored = await Materialize(audio); + Assert.That(stored, Is.EqualTo(source), "Standard PCM must be stored byte-identical (passthrough)"); + Assert.That(audio.Size, Is.EqualTo(source.Length), "Passthrough Size must equal the source length"); } [Test] @@ -54,7 +59,8 @@ public class AudioProcessorTests Assert.That(audio, Is.Not.Null); Assert.That(audio!.Duration, Is.GreaterThan(0.0)); Assert.That(audio.Bitrate, Is.GreaterThan(0)); - Assert.That(ReadFmtAudioFormat(audio.Buffer), Is.EqualTo(WaveFormatPcm), "Stored buffer must be standard PCM"); + var stored = await Materialize(audio); + Assert.That(ReadFmtAudioFormat(stored), Is.EqualTo(WaveFormatPcm), "Stored buffer must be standard PCM"); } [Test] @@ -70,17 +76,19 @@ public class AudioProcessorTests var audio = await new AudioProcessor().ProcessWavFileAsync(path); Assert.That(audio, Is.Not.Null); - Assert.That(ReadFmtBitsPerSample(audio!.Buffer), Is.EqualTo(16 + 8), "Float must convert to 24-bit PCM"); - Assert.That(ReadFmtAudioFormat(audio.Buffer), Is.EqualTo(WaveFormatPcm)); + var stored = await Materialize(audio!); + Assert.That(ReadFmtBitsPerSample(stored), Is.EqualTo(16 + 8), "Float must convert to 24-bit PCM"); + Assert.That(ReadFmtAudioFormat(stored), Is.EqualTo(WaveFormatPcm)); // 4 float samples (4 bytes each) → 4 PCM samples (3 bytes each) = 12 data bytes after the 44-byte header. - Assert.That(audio.Buffer.Length, Is.EqualTo(44 + 12)); + Assert.That(stored.Length, Is.EqualTo(44 + 12)); + Assert.That(audio!.Size, Is.EqualTo(44 + 12), "Computed Size must match the streamed byte count"); // Verify the converted sample values: (int)(sample * 8388607.0), clamped, little-endian 3 bytes. // 0.5f → 4194303 = 0x3FFFFF → FF FF 3F // -0.5f → -4194303 = 0xFFC00001 → 24-bit LE: 01 00 C0 // 1.0f → 8388607 = 0x7FFFFF → FF FF 7F // -1.0f → -8388607 = 0xFF800001 → 24-bit LE: 01 00 80 var expectedData = new byte[] { 0xFF, 0xFF, 0x3F, 0x01, 0x00, 0xC0, 0xFF, 0xFF, 0x7F, 0x01, 0x00, 0x80 }; - var actualData = audio.Buffer[44..]; + var actualData = stored[44..]; Assert.That(actualData, Is.EqualTo(expectedData), "Float samples must be converted to 24-bit PCM correctly"); } @@ -97,17 +105,18 @@ public class AudioProcessorTests var audio = await new AudioProcessor().ProcessWavFileAsync(path); Assert.That(audio, Is.Not.Null); - Assert.That(ReadFmtBitsPerSample(audio!.Buffer), Is.EqualTo(24), "Padded container must repack to 24-bit"); - Assert.That(ReadFmtAudioFormat(audio.Buffer), Is.EqualTo(WaveFormatPcm)); + var stored = await Materialize(audio!); + Assert.That(ReadFmtBitsPerSample(stored), Is.EqualTo(24), "Padded container must repack to 24-bit"); + Assert.That(ReadFmtAudioFormat(stored), Is.EqualTo(WaveFormatPcm)); // 4 container samples (4 bytes each) → 4 PCM samples (3 bytes each) = 12 data bytes. - Assert.That(audio.Buffer.Length, Is.EqualTo(44 + 12)); + Assert.That(stored.Length, Is.EqualTo(44 + 12)); // Verify the repacked sample values: lowest 3 bytes of each 4-byte little-endian container. // 0x123456 → LE 4 bytes: 56 34 12 00 → keep 3: 56 34 12 // 0xFFEDCBA9 → LE 4 bytes: A9 CB ED FF → keep 3: A9 CB ED // 0x000001 → LE 4 bytes: 01 00 00 00 → keep 3: 01 00 00 // 0xFF800000 → LE 4 bytes: 00 00 80 FF → keep 3: 00 00 80 var expectedData = new byte[] { 0x56, 0x34, 0x12, 0xA9, 0xCB, 0xED, 0x01, 0x00, 0x00, 0x00, 0x00, 0x80 }; - var actualData = audio.Buffer[44..]; + var actualData = stored[44..]; Assert.That(actualData, Is.EqualTo(expectedData), "Padded 24-in-32 samples must strip the padding byte correctly"); } @@ -265,6 +274,15 @@ public class AudioProcessorTests // -- helpers -------------------------------------------------------------------------------- + /// Streams a store plan into memory so its canonical bytes + /// can be asserted — the store path no longer hands back a materialized buffer. + private static async Task Materialize(ProcessedAudio audio) + { + using var ms = new MemoryStream(); + await audio.WriteToAsync(ms); + return ms.ToArray(); + } + /// /// Synthesises a minimal valid MPEG1 Layer III CBR MP3 buffer: one frame header plus enough body /// bytes for the frame, with an optional Xing VBR header in the side-information region. The body diff --git a/DeepDrftTests/AudioStoreStreamingTests.cs b/DeepDrftTests/AudioStoreStreamingTests.cs new file mode 100644 index 0000000..765968e --- /dev/null +++ b/DeepDrftTests/AudioStoreStreamingTests.cs @@ -0,0 +1,389 @@ +using System.Text; +using DeepDrftContent; +using DeepDrftContent.Constants; +using DeepDrftContent.FileDatabase.Models; +using DeepDrftContent.Processors; +using FileDb = DeepDrftContent.FileDatabase.Services.FileDatabase; + +namespace DeepDrftTests; + +/// +/// Tests for the streamed audio store path (Wave 1 OOM fix): processors emit a +/// plan whose body is written to the vault without ever materializing +/// the whole file in a managed byte[]. Covers passthrough byte-identity (standard-PCM WAV, +/// MP3, FLAC), streamed WAV normalization (EXTENSIBLE), the new streaming vault register round-trip, +/// and the memory-bounding contract of the store primitive (bounded-buffer, sequential, forward-only +/// writes — never a single whole-file write). +/// +[TestFixture] +public class AudioStoreStreamingTests +{ + private const ushort WaveFormatPcm = 0x0001; + private const ushort WaveFormatExtensible = 0xFFFE; + + private string _testDir = string.Empty; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), "AudioStoreStreamingTests", Guid.NewGuid().ToString()); + Directory.CreateDirectory(_testDir); + } + + [TearDown] + public void TearDown() + { + try { Directory.Delete(_testDir, recursive: true); } + catch { /* Best-effort cleanup — ignore failures */ } + } + + private static AudioProcessorRouter Router() => + new(new AudioProcessor(), new Mp3AudioProcessor(), new FlacAudioProcessor()); + + private static TrackContentService Content(FileDb db) => new(db, Router()); + + // -- End-to-end store byte-identity (passthrough) ----------------------------------------- + + [Test] + public async Task StandardPcmWav_StoredByteIdenticalToSource() + { + var source = BuildPcmWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, dataBytes: 200_000); + var path = await WriteAsync(source, ".wav"); + var db = await FileDb.FromAsync(_testDir); + + var content = Content(db!); + var entity = await content.AddTrackAsync(path, "Track", "Artist"); + Assert.That(entity, Is.Not.Null); + + var stored = await content.GetAudioBinaryAsync(entity!.EntryKey); + Assert.That(stored, Is.Not.Null); + Assert.That(stored!.Buffer, Is.EqualTo(source), "Standard PCM must be stored byte-identical"); + Assert.That(stored.Duration, Is.GreaterThan(0.0)); + } + + [Test] + public async Task Mp3_StoredByteIdenticalToSource_MetadataCorrect() + { + var source = BuildMinimalMp3(); + var path = await WriteAsync(source, ".mp3"); + var db = await FileDb.FromAsync(_testDir); + + var content = Content(db!); + var entity = await content.AddTrackAsync(path, "Track", "Artist"); + Assert.That(entity, Is.Not.Null); + + var stored = await content.GetAudioBinaryAsync(entity!.EntryKey); + Assert.Multiple(() => + { + Assert.That(stored, Is.Not.Null); + Assert.That(stored!.Extension, Is.EqualTo(".mp3")); + Assert.That(stored.Buffer, Is.EqualTo(source), "MP3 must be stored byte-identical (no transcode)"); + Assert.That(stored.Bitrate, Is.EqualTo(128)); + }); + } + + [Test] + public async Task Flac_StoredByteIdenticalToSource_MetadataCorrect() + { + var source = BuildMinimalFlac(); + var path = await WriteAsync(source, ".flac"); + var db = await FileDb.FromAsync(_testDir); + + var content = Content(db!); + var entity = await content.AddTrackAsync(path, "Track", "Artist"); + Assert.That(entity, Is.Not.Null); + + var stored = await content.GetAudioBinaryAsync(entity!.EntryKey); + Assert.Multiple(() => + { + Assert.That(stored, Is.Not.Null); + Assert.That(stored!.Extension, Is.EqualTo(".flac")); + Assert.That(stored.Buffer, Is.EqualTo(source), "FLAC must be stored byte-identical (no transcode)"); + Assert.That(stored.Duration, Is.GreaterThan(0.0)); + }); + } + + // -- Streamed normalization --------------------------------------------------------------- + + [Test] + public async Task ExtensibleFloatWav_StoredAsNormalizedStandardPcm() + { + // A >80 KB float data region forces the streamed transform across multiple bounded chunks. + var floatData = BuildFloatRamp(sampleCount: 40_000); // 160 000 bytes in, 120 000 bytes out (24-bit) + var source = BuildExtensibleWav(channels: 2, sampleRate: 44100, containerBits: 32, validBits: 32, + subFormatTag: 0x0003, sampleData: floatData); + var path = await WriteAsync(source, ".wav"); + var db = await FileDb.FromAsync(_testDir); + + var content = Content(db!); + var entity = await content.AddTrackAsync(path, "Track", "Artist"); + Assert.That(entity, Is.Not.Null); + + var stored = await content.GetAudioBinaryAsync(entity!.EntryKey); + Assert.That(stored, Is.Not.Null); + Assert.Multiple(() => + { + Assert.That(BitConverter.ToUInt16(stored!.Buffer, 20), Is.EqualTo(WaveFormatPcm), + "EXTENSIBLE must be normalized to standard PCM (audioFormat = 1)"); + Assert.That(BitConverter.ToUInt16(stored.Buffer, 34), Is.EqualTo(24), "Float must normalize to 24-bit"); + Assert.That(stored.Buffer.Length, Is.EqualTo(44 + (floatData.Length / 4) * 3), + "Output size = 44-byte header + 3 bytes per float sample"); + }); + } + + // -- Streaming vault register round-trip --------------------------------------------------- + + [Test] + public async Task RegisterResourceStreamingAsync_RoundTrips() + { + var db = await FileDb.FromAsync(_testDir); + await db!.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio); + + var payload = Enumerable.Range(0, 50_000).Select(i => (byte)(i % 256)).ToArray(); + var meta = MetaDataFactory.CreateAudioMetaData("entry-1", ".wav", 12.5, 1411); + + var ok = await db.RegisterResourceStreamingAsync( + VaultConstants.Tracks, "entry-1", meta, + (dest, ct) => dest.WriteAsync(payload, ct).AsTask()); + Assert.That(ok, Is.True); + + var loaded = await db.LoadResourceAsync(VaultConstants.Tracks, "entry-1"); + Assert.Multiple(() => + { + Assert.That(loaded, Is.Not.Null); + Assert.That(loaded!.Buffer, Is.EqualTo(payload), "Streamed bytes must round-trip exactly"); + Assert.That(loaded.Duration, Is.EqualTo(12.5)); + Assert.That(loaded.Bitrate, Is.EqualTo(1411)); + }); + } + + [Test] + public async Task RegisterResourceStreamingAsync_UnknownVault_ReturnsFalse() + { + var db = await FileDb.FromAsync(_testDir); + var meta = MetaDataFactory.CreateAudioMetaData("e", ".wav", 1.0, 1411); + + var ok = await db!.RegisterResourceStreamingAsync( + "does-not-exist", "e", meta, (dest, ct) => Task.CompletedTask); + + Assert.That(ok, Is.False, "Register into a missing vault must swallow and return false"); + } + + // -- Memory-bounding of the store primitive ----------------------------------------------- + + [Test] + public async Task WriteToAsync_StreamsInBoundedSequentialChunks_NotOneWholeFileWrite() + { + // A multi-hundred-KB passthrough file: a buffered implementation would issue one giant write of + // the whole body; the streamed primitive must write in bounded, forward-only chunks. + var source = BuildPcmWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, dataBytes: 600_000); + var path = await WriteAsync(source, ".wav"); + + var processed = await new AudioProcessor().ProcessWavFileAsync(path); + Assert.That(processed, Is.Not.Null); + + var probe = new BoundedWriteProbeStream(); + await processed!.WriteToAsync(probe); + + Assert.Multiple(() => + { + Assert.That(probe.TotalBytes, Is.EqualTo(source.Length), "All bytes must be written"); + Assert.That(probe.WriteCount, Is.GreaterThan(1), "Body must be streamed in multiple chunks"); + Assert.That(probe.MaxWriteSize, Is.LessThanOrEqualTo(81920), + "No single write may exceed the bounded buffer — i.e. the whole file is never buffered"); + }); + } + + [Test] + public async Task WriteToAsync_NormalizedFloat_StreamsToForwardOnlyStream() + { + // The normalized path seeks the *source* but must only write the destination sequentially. + var floatData = BuildFloatRamp(sampleCount: 30_000); + var source = BuildExtensibleWav(channels: 2, sampleRate: 44100, containerBits: 32, validBits: 32, + subFormatTag: 0x0003, sampleData: floatData); + var path = await WriteAsync(source, ".wav"); + + var processed = await new AudioProcessor().ProcessWavFileAsync(path); + Assert.That(processed, Is.Not.Null); + + var probe = new BoundedWriteProbeStream(); + await processed!.WriteToAsync(probe); + + Assert.Multiple(() => + { + Assert.That(probe.TotalBytes, Is.EqualTo(44 + (floatData.Length / 4) * 3)); + Assert.That(probe.WriteCount, Is.GreaterThan(1), "Header + multiple transformed body chunks"); + Assert.That(probe.MaxWriteSize, Is.LessThanOrEqualTo(81920)); + }); + } + + // -- builders ----------------------------------------------------------------------------- + + private async Task WriteAsync(byte[] bytes, string extension) + { + var path = Path.Combine(_testDir, Guid.NewGuid().ToString("N") + extension); + await File.WriteAllBytesAsync(path, bytes); + return path; + } + + private static byte[] BuildPcmWav(int channels, int sampleRate, int bitsPerSample, int dataBytes) + { + var blockAlign = (ushort)(channels * (bitsPerSample / 8)); + var byteRate = (uint)(sampleRate * blockAlign); + var data = new byte[dataBytes]; + for (var i = 0; i < data.Length; i++) data[i] = (byte)(i % 251); + + using var ms = new MemoryStream(); + using var w = new BinaryWriter(ms, Encoding.ASCII, leaveOpen: true); + w.Write(Encoding.ASCII.GetBytes("RIFF")); + w.Write((uint)(36 + data.Length)); + w.Write(Encoding.ASCII.GetBytes("WAVE")); + w.Write(Encoding.ASCII.GetBytes("fmt ")); + w.Write(16u); + w.Write(WaveFormatPcm); + w.Write((ushort)channels); + w.Write((uint)sampleRate); + w.Write(byteRate); + w.Write(blockAlign); + w.Write((ushort)bitsPerSample); + w.Write(Encoding.ASCII.GetBytes("data")); + w.Write((uint)data.Length); + w.Write(data); + w.Flush(); + return ms.ToArray(); + } + + private static byte[] BuildExtensibleWav( + int channels, int sampleRate, int containerBits, int validBits, ushort subFormatTag, byte[] sampleData) + { + var blockAlign = (ushort)(channels * (containerBits / 8)); + var byteRate = (uint)(sampleRate * blockAlign); + const uint fmtChunkSize = 40; + + using var ms = new MemoryStream(); + using var w = new BinaryWriter(ms, Encoding.ASCII, leaveOpen: true); + w.Write(Encoding.ASCII.GetBytes("RIFF")); + w.Write((uint)(36 + (fmtChunkSize - 16) + sampleData.Length)); + w.Write(Encoding.ASCII.GetBytes("WAVE")); + w.Write(Encoding.ASCII.GetBytes("fmt ")); + w.Write(fmtChunkSize); + w.Write(WaveFormatExtensible); + w.Write((ushort)channels); + w.Write((uint)sampleRate); + w.Write(byteRate); + w.Write(blockAlign); + w.Write((ushort)containerBits); + w.Write((ushort)22); // cbSize + w.Write((ushort)validBits); // wValidBitsPerSample + w.Write((uint)0); // channel mask + var guid = new byte[16]; + guid[0] = (byte)(subFormatTag & 0xFF); + guid[1] = (byte)((subFormatTag >> 8) & 0xFF); + w.Write(guid); + w.Write(Encoding.ASCII.GetBytes("data")); + w.Write((uint)sampleData.Length); + w.Write(sampleData); + w.Flush(); + return ms.ToArray(); + } + + private static byte[] BuildFloatRamp(int sampleCount) + { + var bytes = new byte[sampleCount * 4]; + for (var i = 0; i < sampleCount; i++) + { + var sample = (float)((i % 200) / 200.0 - 0.5); // a deterministic ramp in [-0.5, 0.5) + BitConverter.GetBytes(sample).CopyTo(bytes, i * 4); + } + return bytes; + } + + private static byte[] BuildMinimalMp3() + { + // One MPEG1 Layer III CBR frame: 128 kbps, 44.1 kHz, stereo. Body zero-filled (silence). + const int frameSize = 417; // floor(144 * 128000 / 44100) + var buffer = new byte[frameSize]; + buffer[0] = 0xFF; + buffer[1] = 0xFB; // sync + MPEG1 + Layer III + no CRC + buffer[2] = (byte)((9 << 4) | (0 << 2)); // bitrate index 9 (128 kbps), sample-rate index 0 (44.1 kHz) + buffer[3] = 0x00; // stereo + return buffer; + } + + private static byte[] BuildMinimalFlac() + { + using var ms = new MemoryStream(); + ms.Write(Encoding.ASCII.GetBytes("fLaC")); + ms.WriteByte(0x80); // last block, STREAMINFO + ms.WriteByte(0x00); + ms.WriteByte(0x00); + ms.WriteByte(34); + + var s = new byte[34]; + const int sampleRate = 44100; + const int channels = 2; + const int bitsPerSample = 16; + const long totalSamples = 44100L * 5; + s[10] = (byte)((sampleRate >> 12) & 0xFF); + s[11] = (byte)((sampleRate >> 4) & 0xFF); + var bps = bitsPerSample - 1; + s[12] = (byte)(((sampleRate & 0x0F) << 4) | (((channels - 1) & 0x07) << 1) | ((bps >> 4) & 0x01)); + s[13] = (byte)(((bps & 0x0F) << 4) | (int)((totalSamples >> 32) & 0x0F)); + s[14] = (byte)((totalSamples >> 24) & 0xFF); + s[15] = (byte)((totalSamples >> 16) & 0xFF); + s[16] = (byte)((totalSamples >> 8) & 0xFF); + s[17] = (byte)(totalSamples & 0xFF); + ms.Write(s); + + // Trailing zero bytes standing in for encoded frames (affect only the average-bitrate compute). + ms.Write(new byte[100_000]); + return ms.ToArray(); + } + + /// + /// A write-only, forward-only (non-seekable) stream that records how the store primitive writes: + /// the number of writes, the largest single write, and the total. Proves the body is streamed in + /// bounded, sequential chunks rather than buffered into one whole-file write. + /// + private sealed class BoundedWriteProbeStream : Stream + { + public int WriteCount { get; private set; } + public int MaxWriteSize { get; private set; } + public long TotalBytes { get; private set; } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => TotalBytes; + public override long Position { get => TotalBytes; set => throw new NotSupportedException(); } + + private void Record(int count) + { + WriteCount++; + if (count > MaxWriteSize) MaxWriteSize = count; + TotalBytes += count; + } + + public override void Write(byte[] buffer, int offset, int count) => Record(count); + + public override void Write(ReadOnlySpan buffer) => Record(buffer.Length); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + Record(buffer.Length); + return ValueTask.CompletedTask; + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + Record(count); + return Task.CompletedTask; + } + + public override void Flush() { } + public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + } +} diff --git a/DeepDrftTests/TrackReplaceAudioTests.cs b/DeepDrftTests/TrackReplaceAudioTests.cs index 7ea818c..55c480b 100644 --- a/DeepDrftTests/TrackReplaceAudioTests.cs +++ b/DeepDrftTests/TrackReplaceAudioTests.cs @@ -68,9 +68,9 @@ public class TrackReplaceAudioTests var originalDuration = before!.Duration; var replacement = await WriteWavAsync(BuildMinimalPcmWav(6.0), ".wav"); - var newAudio = await content.ReplaceTrackAudioAsync(entryKey, replacement); + var newDuration = await content.ReplaceTrackAudioAsync(entryKey, replacement); - Assert.That(newAudio, Is.Not.Null, "Replace should return the freshly stored audio"); + Assert.That(newDuration, Is.Not.Null, "Replace should return the freshly stored audio's duration"); var after = await content.GetAudioBinaryAsync(entryKey); Assert.Multiple(() => @@ -78,8 +78,8 @@ public class TrackReplaceAudioTests Assert.That(after, Is.Not.Null, "The track must remain retrievable under the same EntryKey"); Assert.That(after!.Duration, Is.GreaterThan(originalDuration), "The retrieved audio must reflect the longer replacement, not the original"); - Assert.That(newAudio!.Duration, Is.EqualTo(after.Duration), - "The returned binary must match what is stored under the key"); + Assert.That(newDuration!.Value, Is.EqualTo(after.Duration), + "The returned duration must match what is stored under the key"); }); } @@ -101,9 +101,9 @@ public class TrackReplaceAudioTests Assert.That(wavFilesBefore, Is.Not.Empty, "Sanity: the original .wav backing file exists"); var replacement = await WriteFlacAsync(); - var newAudio = await content.ReplaceTrackAudioAsync(entryKey, replacement); + var newDuration = await content.ReplaceTrackAudioAsync(entryKey, replacement); - Assert.That(newAudio, Is.Not.Null); + Assert.That(newDuration, Is.Not.Null); Assert.Multiple(() => { Assert.That(Directory.GetFiles(vaultDir, "*.wav"), Is.Empty, @@ -135,12 +135,15 @@ public class TrackReplaceAudioTests Assert.That(staleHighRes, Is.Not.Null); var replacement = await WriteWavAsync(BuildMinimalPcmWav(20.0), ".wav"); - var newAudio = await content.ReplaceTrackAudioAsync(entryKey, replacement); - Assert.That(newAudio, Is.Not.Null); + var newDuration = await content.ReplaceTrackAudioAsync(entryKey, replacement); + Assert.That(newDuration, Is.Not.Null); - // Regen step (mirrors the orchestrator). - Assert.That(await waveforms.ComputeAndStoreAsync(newAudio!.Buffer, entryKey), Is.True); - Assert.That(await waveforms.ComputeAndStoreHighResAsync(newAudio.Buffer, entryKey, newAudio.Duration), Is.True); + // Regen step (mirrors the orchestrator, which re-reads the freshly stored audio from the vault + // rather than consuming an in-memory buffer the streamed store no longer hands back). + var newStored = await content.GetAudioBinaryAsync(entryKey); + Assert.That(newStored, Is.Not.Null); + Assert.That(await waveforms.ComputeAndStoreAsync(newStored!.Buffer, entryKey), Is.True); + Assert.That(await waveforms.ComputeAndStoreHighResAsync(newStored.Buffer, entryKey, newStored.Duration), Is.True); var freshHighRes = await waveforms.GetProfileAsync(entryKey, VaultConstants.TrackWaveforms); var freshProfile = await waveforms.GetProfileAsync(entryKey);