Merge Wave 1: stream audio store path to fix large-upload OOM into dev

This commit is contained in:
daniel-c-harvey
2026-06-25 15:59:43 -04:00
13 changed files with 1039 additions and 168 deletions
+10 -16
View File
@@ -138,7 +138,7 @@ public class UnifiedTrackService
}
var unpersisted = await _contentTrackContentService.AddTrackAsync(
tempFilePath, trackName, artist, album, genre, releaseDate, originalFileName: originalFileName);
tempFilePath, trackName, artist, album, genre, releaseDate, originalFileName: originalFileName, cancellationToken: ct);
if (unpersisted is null)
{
@@ -269,31 +269,25 @@ public class UnifiedTrackService
var entryKey = lookup.Value.EntryKey;
var newAudio = await _contentTrackContentService.ReplaceTrackAudioAsync(entryKey, tempFilePath);
if (newAudio is null)
var newDuration = await _contentTrackContentService.ReplaceTrackAudioAsync(entryKey, tempFilePath, ct);
if (newDuration is null)
{
_logger.LogWarning("ReplaceAudioAsync: content swap returned null for track {TrackId} ({EntryKey})", trackId, entryKey);
return Result.CreateFailResult("Failed to process and store the replacement audio.");
}
// The old waveform no longer matches the new bytes. Regenerate both datums in place; keyed
// by the same EntryKey, the re-run overwrites the stale data (proven re-runnable). The
// freshly stored buffer is the authoritative source — no re-read of the vault needed.
try
{
await _waveformProfileService.ComputeAndStoreAsync(newAudio.Buffer, entryKey);
await _waveformProfileService.ComputeAndStoreHighResAsync(newAudio.Buffer, entryKey, newAudio.Duration);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "ReplaceAudioAsync: waveform regen failed for {EntryKey}; replace unaffected.", entryKey);
}
// The old waveform no longer matches the new bytes. Regenerate both datums in place, keyed by
// the same EntryKey (the re-run overwrites the stale data). The store path no longer hands back
// a buffer, so the waveform compute re-reads the freshly stored audio from the vault — the same
// path the upload uses. That re-read is whole-file (Wave 2, still unbounded by design); the
// store itself is now streamed. Best-effort throughout: a datum failure never fails the replace.
await TryStoreWaveformDatumsAsync(entryKey, ct);
// Write the new duration to SQL. The vault bytes are already swapped, so this is the
// authoritative metadata update for the replace. A failure here is surfaced (unlike the
// best-effort waveform regen above) because a stale DurationSeconds silently corrupts
// derived aggregates (e.g. MixRuntimeSeconds on the home stats endpoint).
var durationWrite = await _sqlTrackService.SetDuration(trackId, newAudio.Duration, ct);
var durationWrite = await _sqlTrackService.SetDuration(trackId, newDuration.Value, ct);
if (!durationWrite.Success)
{
var error = durationWrite.Messages.FirstOrDefault()?.Message ?? "Unknown error";
@@ -178,6 +178,46 @@ public class FileDatabase : DirectoryIndexDirectory, IDisposable
return false;
}
/// <summary>
/// Registers a resource by streaming its bytes into the vault, without materializing the whole
/// file in a managed <c>byte[]</c> (the store-path OOM fix). The caller supplies the index
/// <paramref name="metaData"/> and a <paramref name="writeContent"/> callback that emits bytes to
/// the backing stream. Swallows exceptions and returns false, matching
/// <see cref="RegisterResourceAsync"/>'s contract — callers check the bool.
/// </summary>
public async Task<bool> RegisterResourceStreamingAsync(
string vaultId,
string entryId,
MetaData metaData,
Func<Stream, CancellationToken, Task> writeContent,
CancellationToken cancellationToken = default)
{
try
{
var directoryVault = _vaults.Get(vaultId);
if (directoryVault != null)
{
var written = await directoryVault.AddEntryStreamingAsync(entryId, metaData, writeContent, cancellationToken);
_logger.LogInformation(
"Streamed {Bytes} bytes into vault {VaultId} entry {EntryId} (no whole-file buffer).",
written, vaultId, entryId);
return true;
}
}
catch (Exception ex)
{
// Swallow and return false, matching RegisterResourceAsync. Log at error for real failures
// only — a normal client cancel (OperationCanceledException) is not an error condition and
// would spam the error log on every client disconnect during a large upload or replace.
if (ex is not OperationCanceledException)
{
_logger.LogError(ex, "RegisterResourceStreamingAsync failed for vault {VaultId} entry {EntryId}", vaultId, entryId);
}
}
return false;
}
/// <summary>
/// Removes a resource from a specific vault. Returns null if the vault does not exist,
/// false if the entry was not found, true if the entry was removed. Distinguishing
@@ -56,6 +56,63 @@ public abstract class MediaVault : VaultIndexDirectory
await FileUtils.PutFileAsync(mediaPath, buffer);
}
/// <summary>
/// Streams an entry's bytes into the vault without ever materializing the whole file in memory.
/// The metadata is supplied by the caller (there is no in-memory <see cref="FileBinary"/> to infer
/// it from) — the store path (upload / replace-audio) sources its bytes from a staging file, not a
/// buffer. Returns the number of bytes written, for the caller to log.
///
/// Write ordering (atomic-replace guarantee): bytes are streamed to a temp file in the same vault
/// directory, the temp file is renamed over the final backing-file path (POSIX <c>rename(2)</c> —
/// atomic on the Linux prod host), and the index is updated only after the rename succeeds.
/// This ordering ensures: (a) the index never advertises a not-yet-present file; (b) a client
/// disconnect or I/O fault during the write leaves any prior backing file intact and the index
/// unchanged; (c) the temp file is cleaned up best-effort on any failure before re-throwing so the
/// vault directory stays tidy. The caller treats a thrown exception as a failed register.
/// </summary>
public async Task<long> AddEntryStreamingAsync(
string entryId,
MetaData metaData,
Func<Stream, CancellationToken, Task> writeContent,
CancellationToken cancellationToken = default)
{
var finalPath = GetMediaPathFromEntryKey(entryId, metaData.Extension);
var tempPath = Path.Combine(RootPath, Path.GetRandomFileName() + ".tmp");
try
{
long bytesWritten;
await using (var tempStream = new FileStream(
tempPath, FileMode.CreateNew, FileAccess.Write, FileShare.None,
bufferSize: 81920, useAsync: true))
{
await writeContent(tempStream, cancellationToken);
await tempStream.FlushAsync(cancellationToken);
bytesWritten = tempStream.Length;
}
// Rename into place — atomic on the Linux prod host (POSIX rename(2)); overwrites any
// existing same-extension backing file safely on the replace path.
File.Move(tempPath, finalPath, overwrite: true);
// Update the index only after the file is durably in place. A crash between Move and
// AddToIndexAsync leaves an unreferenced file on disk (a harmless orphan recoverable
// by a vault scan); a crash or cancel during the temp write leaves the original backing
// file and the index both unchanged.
await AddToIndexAsync(entryId, metaData);
return bytesWritten;
}
catch
{
// Best-effort temp-file cleanup. After a successful rename tempPath is gone and the
// delete is a no-op. After a write failure or cancel tempPath holds partial bytes that
// must be removed so the vault directory stays tidy.
try { if (File.Exists(tempPath)) File.Delete(tempPath); } catch { /* best-effort */ }
throw;
}
}
/// <summary>
/// Retrieves an entry from the vault (MediaVaultType inferred from T)
/// </summary>
+205 -63
View File
@@ -7,12 +7,22 @@ namespace DeepDrftContent.Processors;
/// </summary>
public class AudioProcessor
{
// Header parsing never needs the audio body. Read the file in 64 KB steps until the data-chunk
// header is locatable, capping the window so a pathological file with an enormous pre-data header
// cannot drive an unbounded allocation — such a file simply falls through to default metadata and
// passthrough storage, the same outcome as any unparseable WAV.
private const int HeaderWindowStep = 64 * 1024;
private const int HeaderWindowCap = 8 * 1024 * 1024;
/// <summary>
/// Processes a WAV file and creates an AudioBinary object
/// Processes a WAV file into a <see cref="ProcessedAudio"/> store plan: extracts metadata from a
/// bounded header window (never the whole file) and returns a streamed writer for the canonical
/// vault bytes. Standard PCM is stored verbatim (passthrough copy); EXTENSIBLE-PCM / IEEE-float /
/// padded-container WAVs are normalized to a plain 44-byte standard-PCM WAV, written progressively
/// so the vault only ever holds a format the streaming pipeline already handles.
/// </summary>
/// <param name="filePath">Path to the WAV file</param>
/// <returns>AudioBinary object with metadata</returns>
public async Task<AudioBinary?> ProcessWavFileAsync(string filePath)
public async Task<ProcessedAudio?> ProcessWavFileAsync(string filePath, CancellationToken cancellationToken = default)
{
if (!File.Exists(filePath))
{
@@ -26,30 +36,197 @@ public class AudioProcessor
try
{
var buffer = await File.ReadAllBytesAsync(filePath);
var wavInfo = ExtractWavMetadata(buffer);
var fileLength = new FileInfo(filePath).Length;
var window = await ReadWavHeaderWindowAsync(filePath, cancellationToken);
var wavInfo = ExtractWavMetadata(window);
// EXTENSIBLE-PCM is byte-compatible with standard PCM but carries a 40+ byte fmt chunk
// the streaming pipeline never expects. Normalize to a plain 44-byte PCM WAV at storage
// time so the vault only ever holds standard PCM and the client decode path stays unchanged.
var storedBuffer = wavInfo.IsExtensible ? NormalizeToStandardPcm(buffer, wavInfo) : buffer;
if (!wavInfo.IsExtensible)
{
// Standard PCM (or the default-fallback path, which reports IsExtensible = false):
// the source bytes are already a format the pipeline handles, so store them verbatim.
return ProcessedAudio.Passthrough(filePath, ".wav", wavInfo.Duration, wavInfo.Bitrate, fileLength);
}
var parameters = new AudioBinaryParams(
Buffer: storedBuffer,
Size: storedBuffer.Length,
Extension: ".wav",
Duration: wavInfo.Duration,
Bitrate: wavInfo.Bitrate
);
// EXTENSIBLE → streamed normalization. The output data size is derivable from the source
// data size alone (no body read needed): verbatim keeps it, float drops 1 byte per sample
// (4→3), padded keeps only the valid bytes per container sample.
var dataStart = (long)wavInfo.DataChunkPos + 8;
var available = fileLength - dataStart;
var srcDataSize = Math.Min((long)wavInfo.DataSize, available);
return new AudioBinary(parameters);
NormalizeMode mode;
int outBitsPerSample;
long outDataSize;
int containerBytes = 0;
int validBytes = 0;
if (wavInfo.IsFloat)
{
mode = NormalizeMode.Float;
outBitsPerSample = 24;
outDataSize = (srcDataSize / 4) * 3;
}
else if (wavInfo.IsPaddedContainer)
{
mode = NormalizeMode.Padded;
outBitsPerSample = wavInfo.BitsPerSample;
containerBytes = wavInfo.ContainerBitsPerSample / 8;
validBytes = wavInfo.BitsPerSample / 8;
outDataSize = (srcDataSize / containerBytes) * validBytes;
}
else
{
mode = NormalizeMode.Verbatim;
outBitsPerSample = wavInfo.BitsPerSample;
outDataSize = srcDataSize;
}
var channels = wavInfo.Channels;
var sampleRate = wavInfo.SampleRate;
return new ProcessedAudio(
".wav", wavInfo.Duration, wavInfo.Bitrate, 44 + outDataSize,
(destination, ct) => WriteNormalizedWavAsync(
filePath, dataStart, srcDataSize, channels, sampleRate, outBitsPerSample,
outDataSize, mode, containerBytes, validBytes, destination, ct));
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException)
{
throw new InvalidOperationException($"Failed to process WAV file: {ex.Message}", ex);
}
}
/// <summary>
/// Reads only enough of the file to contain the fmt chunk and the data chunk's 8-byte header, so
/// metadata parsing never loads the (potentially ~GB) audio body. Grows the window in 64 KB steps
/// until the data chunk is locatable or EOF/<see cref="HeaderWindowCap"/> is hit.
/// </summary>
private static async Task<byte[]> ReadWavHeaderWindowAsync(string filePath, CancellationToken ct)
{
await using var fs = new FileStream(
filePath, FileMode.Open, FileAccess.Read, FileShare.Read,
bufferSize: HeaderWindowStep, useAsync: true);
using var ms = new MemoryStream();
var buffer = new byte[HeaderWindowStep];
while (ms.Length < HeaderWindowCap)
{
var read = await fs.ReadAsync(buffer, ct);
if (read == 0)
break;
ms.Write(buffer, 0, read);
// FindChunk returns -1 on a partial window (the data chunk isn't reachable yet), so keep
// reading until it is found or the cap/EOF is hit. On normal files the data chunk header
// sits within the first 64 KB, so this loop runs exactly once.
var soFar = ms.ToArray();
if (FindChunk(soFar, "data") >= 0)
return soFar;
}
return ms.ToArray();
}
/// <summary>
/// Writes a normalized standard-PCM WAV to <paramref name="destination"/>: the 44-byte header
/// followed by the data region streamed from the source in bounded, sample-aligned chunks. No
/// whole-file buffer is ever held — peak memory is O(chunk), independent of duration.
/// </summary>
private async Task WriteNormalizedWavAsync(
string sourcePath, long dataStart, long srcDataSize,
int channels, int sampleRate, int outBitsPerSample, long outDataSize,
NormalizeMode mode, int containerBytes, int validBytes,
Stream destination, CancellationToken ct)
{
var header = BuildStandardPcmHeader(channels, sampleRate, outBitsPerSample, outDataSize);
await destination.WriteAsync(header, ct);
await using var src = new FileStream(
sourcePath, FileMode.Open, FileAccess.Read, FileShare.Read,
bufferSize: 81920, useAsync: true);
src.Seek(dataStart, SeekOrigin.Begin);
switch (mode)
{
case NormalizeMode.Verbatim:
await CopyBoundedAsync(src, destination, srcDataSize, ct);
break;
case NormalizeMode.Float:
// Each 4-byte float sample becomes 3 bytes of 24-bit PCM.
await TransformBoundedAsync(src, destination, srcDataSize, unit: 4,
transform: (buf, len) => ConvertFloatTo24BitPcm(buf, 0, len), ct);
break;
case NormalizeMode.Padded:
await TransformBoundedAsync(src, destination, srcDataSize, unit: containerBytes,
transform: (buf, len) => RepackPaddedContainer(buf, 0, len, containerBytes * 8, validBytes * 8), ct);
break;
}
}
/// <summary>Bounded copy of exactly <paramref name="totalBytes"/> from src to dest.</summary>
private static async Task CopyBoundedAsync(Stream src, Stream dest, long totalBytes, CancellationToken ct)
{
var buffer = new byte[81920];
var remaining = totalBytes;
while (remaining > 0)
{
var want = (int)Math.Min(buffer.Length, remaining);
var read = await src.ReadAsync(buffer.AsMemory(0, want), ct);
if (read == 0)
break;
await dest.WriteAsync(buffer.AsMemory(0, read), ct);
remaining -= read;
}
}
/// <summary>
/// Streams <paramref name="totalBytes"/> of source data through <paramref name="transform"/> in
/// sample-aligned chunks, writing each transformed chunk to <paramref name="dest"/>. The read
/// buffer is a multiple of <paramref name="unit"/>; leftover bytes that do not complete a sample
/// are carried into the next read, and a final partial sample is dropped (matching the
/// whole-buffer transforms' integer-division behavior).
/// </summary>
private static async Task TransformBoundedAsync(
Stream src, Stream dest, long totalBytes, int unit,
Func<byte[], int, byte[]> transform, CancellationToken ct)
{
var bufLen = Math.Max(unit, (81920 / unit) * unit);
var buffer = new byte[bufLen];
var remaining = totalBytes;
var carried = 0;
while (remaining > 0)
{
var want = (int)Math.Min(bufLen - carried, remaining);
if (want == 0)
break;
var read = await src.ReadAsync(buffer.AsMemory(carried, want), ct);
if (read == 0)
break;
remaining -= read;
var filled = carried + read;
var whole = (filled / unit) * unit;
if (whole > 0)
{
var output = transform(buffer, whole);
await dest.WriteAsync(output, ct);
}
carried = filled - whole;
if (carried > 0)
Array.Copy(buffer, whole, buffer, 0, carried);
}
}
private enum NormalizeMode
{
/// <summary>Sample bytes already standard PCM (EXTENSIBLE-PCM, depth == container width).</summary>
Verbatim,
/// <summary>IEEE float samples converted to 24-bit PCM.</summary>
Float,
/// <summary>Padded container (e.g. 24-in-32) re-packed to the valid depth.</summary>
Padded
}
/// <summary>
/// Extracts the raw PCM data region and format parameters from a WAV buffer, reusing the
/// same chunk-walk and validation as metadata extraction. Returns null if the buffer is not
@@ -317,50 +494,17 @@ public class AudioProcessor
}
/// <summary>
/// Rebuilds an EXTENSIBLE WAV as a canonical 44-byte-header standard PCM WAV (audioFormat = 1)
/// so the vault only ever holds a format the streaming pipeline already handles. Three source
/// shapes are normalized:
/// <list type="bullet">
/// <item>EXTENSIBLE-PCM (depth == container): sample bytes are byte-identical to standard PCM and
/// copied verbatim; only the header is replaced.</item>
/// <item>IEEE float: 32-bit float samples are converted to 24-bit signed integer PCM.</item>
/// <item>Padded container (e.g. 24-in-32): the padding/sign-extension bytes are stripped, keeping
/// the lowest valid bytes per sample.</item>
/// </list>
/// The output header always reports the valid bit depth (<see cref="WavMetadata.BitsPerSample"/>).
/// Builds the canonical 44-byte standard-PCM WAV header (audioFormat = 1) for a normalized stream.
/// The body is written separately so no whole-file buffer is allocated; this only emits the header
/// the streaming pipeline expects, reporting the valid (post-normalization) bit depth.
/// </summary>
private byte[] NormalizeToStandardPcm(byte[] buffer, WavMetadata metadata)
private static byte[] BuildStandardPcmHeader(int channels, int sampleRate, int outBitsPerSample, long dataSize)
{
// Clamp the declared data size to what is actually present; some encoders overshoot.
var dataStart = metadata.DataChunkPos + 8;
var available = buffer.Length - dataStart;
var srcDataSize = Math.Min(metadata.DataSize, available);
byte[] dataBytes;
int outBitsPerSample;
if (metadata.IsFloat)
{
dataBytes = ConvertFloatTo24BitPcm(buffer, dataStart, srcDataSize);
outBitsPerSample = 24;
}
else if (metadata.IsPaddedContainer)
{
dataBytes = RepackPaddedContainer(buffer, dataStart, srcDataSize, metadata.ContainerBitsPerSample, metadata.BitsPerSample);
outBitsPerSample = metadata.BitsPerSample;
}
else
{
dataBytes = new byte[srcDataSize];
Array.Copy(buffer, dataStart, dataBytes, 0, srcDataSize);
outBitsPerSample = metadata.BitsPerSample;
}
var dataSize = dataBytes.Length;
const int headerSize = 44;
var result = new byte[headerSize + dataSize];
var result = new byte[headerSize];
var blockAlign = (ushort)(metadata.Channels * (outBitsPerSample / 8));
var byteRate = (uint)(metadata.SampleRate * blockAlign);
var blockAlign = (ushort)(channels * (outBitsPerSample / 8));
var byteRate = (uint)(sampleRate * blockAlign);
// RIFF header
System.Text.Encoding.ASCII.GetBytes("RIFF").CopyTo(result, 0);
@@ -371,8 +515,8 @@ public class AudioProcessor
System.Text.Encoding.ASCII.GetBytes("fmt ").CopyTo(result, 12);
BitConverter.GetBytes((uint)16).CopyTo(result, 16);
BitConverter.GetBytes((ushort)1).CopyTo(result, 20); // audioFormat = PCM
BitConverter.GetBytes((ushort)metadata.Channels).CopyTo(result, 22);
BitConverter.GetBytes((uint)metadata.SampleRate).CopyTo(result, 24);
BitConverter.GetBytes((ushort)channels).CopyTo(result, 22);
BitConverter.GetBytes((uint)sampleRate).CopyTo(result, 24);
BitConverter.GetBytes(byteRate).CopyTo(result, 28);
BitConverter.GetBytes(blockAlign).CopyTo(result, 32);
BitConverter.GetBytes((ushort)outBitsPerSample).CopyTo(result, 34);
@@ -381,8 +525,6 @@ public class AudioProcessor
System.Text.Encoding.ASCII.GetBytes("data").CopyTo(result, 36);
BitConverter.GetBytes((uint)dataSize).CopyTo(result, 40);
Array.Copy(dataBytes, 0, result, headerSize, dataSize);
return result;
}
@@ -459,7 +601,7 @@ public class AudioProcessor
/// <summary>
/// Finds a chunk in the WAV file buffer with proper alignment handling
/// </summary>
private int FindChunk(byte[] buffer, string chunkId)
private static int FindChunk(byte[] buffer, string chunkId)
{
var chunkBytes = System.Text.Encoding.ASCII.GetBytes(chunkId);
int offset = 12; // Start after RIFF header
@@ -24,18 +24,18 @@ public class AudioProcessorRouter
}
/// <summary>
/// Processes <paramref name="filePath"/> with the processor matching its extension, returning an
/// <see cref="AudioBinary"/> carrying the stored bytes and extracted metadata. Throws
/// <see cref="ArgumentException"/> for unsupported extensions.
/// Processes <paramref name="filePath"/> with the processor matching its extension, returning a
/// <see cref="ProcessedAudio"/> store plan (extracted metadata plus a streamed writer for the
/// canonical vault bytes). Throws <see cref="ArgumentException"/> for unsupported extensions.
/// </summary>
public async Task<AudioBinary?> ProcessAudioFileAsync(string filePath)
public async Task<ProcessedAudio?> ProcessAudioFileAsync(string filePath, CancellationToken cancellationToken = default)
{
var ext = Path.GetExtension(filePath).ToLowerInvariant();
return ext switch
{
".wav" => await _wavProcessor.ProcessWavFileAsync(filePath),
".mp3" => await _mp3Processor.ProcessMp3FileAsync(filePath),
".flac" => await _flacProcessor.ProcessFlacFileAsync(filePath),
".wav" => await _wavProcessor.ProcessWavFileAsync(filePath, cancellationToken),
".mp3" => await _mp3Processor.ProcessMp3FileAsync(filePath, cancellationToken),
".flac" => await _flacProcessor.ProcessFlacFileAsync(filePath, cancellationToken),
_ => throw new ArgumentException($"Unsupported audio format: {ext}", nameof(filePath)),
};
}
@@ -0,0 +1,58 @@
namespace DeepDrftContent.Processors;
/// <summary>
/// Bounded-buffer streaming primitives shared by the audio processors on the store path. None of
/// these hold the whole file in memory: copies move a fixed window at a time, and the header read
/// caps its allocation regardless of file size.
/// </summary>
internal static class AudioStoreStream
{
private const int CopyBufferSize = 81920; // 80 KB — matches the controller staging copy.
/// <summary>
/// Bounded disk-to-disk copy of <paramref name="sourcePath"/> into <paramref name="destination"/>.
/// Used for passthrough formats whose stored bytes equal the source bytes. Hand-rolled rather than
/// <see cref="Stream.CopyToAsync(Stream)"/> because <c>FileStream</c>'s override writes in 128 KB
/// blocks; this keeps every write at or below <see cref="CopyBufferSize"/>, so peak managed memory
/// is provably O(buffer), never O(filesize).
/// </summary>
public static async Task CopyFileAsync(string sourcePath, Stream destination, CancellationToken ct)
{
await using var src = new FileStream(
sourcePath, FileMode.Open, FileAccess.Read, FileShare.Read,
bufferSize: CopyBufferSize, useAsync: true);
var buffer = new byte[CopyBufferSize];
int read;
while ((read = await src.ReadAsync(buffer, ct)) > 0)
{
await destination.WriteAsync(buffer.AsMemory(0, read), ct);
}
}
/// <summary>
/// Reads at most <paramref name="cap"/> bytes from the start of <paramref name="path"/> — enough
/// for header/metadata parsing without loading the (potentially ~GB) body. Bounds the allocation
/// at <c>min(cap, fileLength)</c>. Size-based metadata (e.g. average bitrate) must use the true
/// file length, supplied separately, not the prefix length.
/// </summary>
public static async Task<byte[]> ReadPrefixAsync(string path, long cap, CancellationToken ct)
{
await using var fs = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read,
bufferSize: CopyBufferSize, useAsync: true);
var length = (int)Math.Min(cap, fs.Length);
var buffer = new byte[length];
var total = 0;
while (total < length)
{
var read = await fs.ReadAsync(buffer.AsMemory(total, length - total), ct);
if (read == 0)
break;
total += read;
}
return total == length ? buffer : buffer[..total];
}
}
@@ -12,7 +12,11 @@ public class FlacAudioProcessor
private const double FallbackDuration = 180.0;
private const int FallbackBitrate = 1411;
public async Task<AudioBinary?> ProcessFlacFileAsync(string filePath)
// STREAMINFO is mandatory and always the first metadata block, immediately after the 4-byte magic
// (data at offset 8, 34 bytes). A small prefix read covers it without loading the body.
private const long HeaderCap = 64 * 1024;
public async Task<ProcessedAudio?> ProcessFlacFileAsync(string filePath, CancellationToken cancellationToken = default)
{
if (!File.Exists(filePath))
{
@@ -24,25 +28,21 @@ public class FlacAudioProcessor
throw new ArgumentException("File must be a FLAC file", nameof(filePath));
}
var buffer = await File.ReadAllBytesAsync(filePath);
var meta = ExtractFlacMetadata(buffer);
var fileLength = new FileInfo(filePath).Length;
var window = await AudioStoreStream.ReadPrefixAsync(filePath, HeaderCap, cancellationToken);
var meta = ExtractFlacMetadata(window, fileLength);
var parameters = new AudioBinaryParams(
Buffer: buffer,
Size: buffer.Length,
Extension: ".flac",
Duration: meta.Duration,
Bitrate: meta.Bitrate);
return new AudioBinary(parameters);
// FLAC is stored unmodified — passthrough the original bytes via a streamed disk-to-disk copy.
return ProcessedAudio.Passthrough(filePath, ".flac", meta.Duration, meta.Bitrate, fileLength);
}
/// <summary>
/// Validates the <c>fLaC</c> magic and the leading STREAMINFO block, then computes duration from
/// total-samples / sample-rate and average bitrate from file size. On any parse failure, logs a
/// warning and returns synthetic defaults — never throws.
/// warning and returns synthetic defaults — never throws. <paramref name="fileLength"/> is the true
/// file size (the header window may be shorter), used for the average-bitrate computation.
/// </summary>
private static FlacMetadata ExtractFlacMetadata(byte[] buffer)
private static FlacMetadata ExtractFlacMetadata(byte[] buffer, long fileLength)
{
try
{
@@ -84,7 +84,7 @@ public class FlacAudioProcessor
var duration = (double)totalSamples / sampleRate;
var bitrate = duration > 0
? (int)(buffer.LongLength * 8L / (duration * 1000))
? (int)(fileLength * 8L / (duration * 1000))
: FallbackBitrate;
return new FlacMetadata { Duration = duration, Bitrate = bitrate };
+20 -17
View File
@@ -25,7 +25,13 @@ public class Mp3AudioProcessor
private const double FallbackDuration = 180.0;
private const int FallbackBitrate = 320;
public async Task<AudioBinary?> ProcessMp3FileAsync(string filePath)
// Metadata lives in the leading ID3v2 tag plus the first MPEG frame. Cap the header read so a
// large MP3 is not pulled into memory whole just to read it; a tag larger than this (very large
// embedded art) simply falls back to the CBR/default estimate, never an OOM. The body is stored
// by streaming the original file, not from this window.
private const long HeaderCap = 8 * 1024 * 1024;
public async Task<ProcessedAudio?> ProcessMp3FileAsync(string filePath, CancellationToken cancellationToken = default)
{
if (!File.Exists(filePath))
{
@@ -37,24 +43,21 @@ public class Mp3AudioProcessor
throw new ArgumentException("File must be an MP3 file", nameof(filePath));
}
var buffer = await File.ReadAllBytesAsync(filePath);
var meta = ExtractMp3Metadata(buffer);
var fileLength = new FileInfo(filePath).Length;
var window = await AudioStoreStream.ReadPrefixAsync(filePath, HeaderCap, cancellationToken);
var meta = ExtractMp3Metadata(window, fileLength);
var parameters = new AudioBinaryParams(
Buffer: buffer,
Size: buffer.Length,
Extension: ".mp3",
Duration: meta.Duration,
Bitrate: meta.Bitrate);
return new AudioBinary(parameters);
// MP3 is stored unmodified — passthrough the original bytes via a streamed disk-to-disk copy.
return ProcessedAudio.Passthrough(filePath, ".mp3", meta.Duration, meta.Bitrate, fileLength);
}
/// <summary>
/// Parses the first valid MPEG frame (after any ID3v2 tag) and any Xing/VBRI tag inside it.
/// On any parse failure, logs a warning and returns synthetic defaults — never throws.
/// <paramref name="fileLength"/> is the true file size (the header window may be shorter), used
/// for the CBR duration estimate.
/// </summary>
private static Mp3Metadata ExtractMp3Metadata(byte[] buffer)
private static Mp3Metadata ExtractMp3Metadata(byte[] buffer, long fileLength)
{
try
{
@@ -65,7 +68,7 @@ public class Mp3AudioProcessor
}
var header = DecodeFrameHeader(buffer, frameStart);
var duration = ComputeDuration(buffer, frameStart, header);
var duration = ComputeDuration(buffer, frameStart, header, fileLength);
return new Mp3Metadata { Duration = duration, Bitrate = header.BitrateKbps };
}
@@ -202,7 +205,7 @@ public class Mp3AudioProcessor
/// Computes duration from a Xing/Info or VBRI tag (accurate for VBR) when present; otherwise
/// falls back to the CBR estimate fileSize / (bitrate_kbps * 125). Guards divide-by-zero.
/// </summary>
private static double ComputeDuration(byte[] buffer, int frameStart, FrameHeader header)
private static double ComputeDuration(byte[] buffer, int frameStart, FrameHeader header, long fileLength)
{
var xingFrames = ReadXingFrameCount(buffer, frameStart, header);
if (xingFrames > 0 && header.SampleRate > 0)
@@ -216,10 +219,10 @@ public class Mp3AudioProcessor
return (double)vbriFrames * header.SamplesPerFrame / header.SampleRate;
}
// CBR fallback: bitrate_kbps * 1000 / 8 bytes per second = bitrate_kbps * 125.
// Exclude the ID3v2 tag bytes (everything before frameStart) from the estimate.
// CBR fallback: bitrate_kbps * 1000 / 8 bytes per second = bitrate_kbps * 125. Uses the true
// file length (not the bounded header window), excluding the ID3v2 tag bytes before frameStart.
var bytesPerSecond = header.BitrateKbps * 125;
return bytesPerSecond > 0 ? (double)(buffer.Length - frameStart) / bytesPerSecond : FallbackDuration;
return bytesPerSecond > 0 ? (double)(fileLength - frameStart) / bytesPerSecond : FallbackDuration;
}
/// <summary>
@@ -0,0 +1,68 @@
namespace DeepDrftContent.Processors;
/// <summary>
/// The product of processing an uploaded audio file on the store path: the metadata SQL and the
/// vault index need, plus a streamed writer that emits the canonical vault bytes to a destination
/// stream without ever materializing the whole file in a managed <c>byte[]</c>.
///
/// This replaces the former whole-file <c>AudioBinary</c> as the processor output for upload /
/// replace-audio (Wave 1 OOM fix): passthrough formats (standard-PCM WAV, MP3, FLAC) stream the
/// source file straight to the destination, and EXTENSIBLE WAVs stream their normalization to
/// standard PCM. The vault <em>load</em> path still uses <c>AudioBinary</c> (a full buffer) — that
/// is the Wave 2 read path and is out of scope here.
///
/// <see cref="WriteToAsync"/> is invoked exactly once by the streaming vault register, against the
/// freshly opened backing <see cref="System.IO.FileStream"/>. The writer re-opens the source file
/// itself, so the source (a staging file) must still exist when the register runs — it does, because
/// processing and registration are sequential within the store call, before the staging-file
/// <c>finally</c> cleanup.
/// </summary>
public sealed class ProcessedAudio
{
/// <summary>The stored file extension (e.g. <c>.wav</c>, <c>.mp3</c>, <c>.flac</c>).</summary>
public string Extension { get; }
/// <summary>Audio duration in seconds, extracted from the header.</summary>
public double Duration { get; }
/// <summary>Audio bitrate in kbps, extracted from (or estimated for) the header.</summary>
public int Bitrate { get; }
/// <summary>
/// The canonical stored byte count — computed from the header and file length, never by
/// buffering the body. Used only for diagnostics (confirming the streamed path was taken).
/// </summary>
public long Size { get; }
private readonly Func<Stream, CancellationToken, Task> _writeTo;
public ProcessedAudio(
string extension,
double duration,
int bitrate,
long size,
Func<Stream, CancellationToken, Task> writeTo)
{
Extension = extension;
Duration = duration;
Bitrate = bitrate;
Size = size;
_writeTo = writeTo;
}
/// <summary>
/// Streams the canonical vault bytes to <paramref name="destination"/>. Bounded-buffer — peak
/// managed memory is O(buffer), not O(filesize).
/// </summary>
public Task WriteToAsync(Stream destination, CancellationToken cancellationToken = default)
=> _writeTo(destination, cancellationToken);
/// <summary>
/// Builds a passthrough plan: the stored bytes are byte-identical to the source file (standard
/// PCM WAV, MP3, FLAC — no transcoding). The writer is a bounded disk-to-disk copy.
/// </summary>
public static ProcessedAudio Passthrough(
string sourcePath, string extension, double duration, int bitrate, long sourceLength)
=> new(extension, duration, bitrate, sourceLength,
(destination, ct) => AudioStoreStream.CopyFileAsync(sourcePath, destination, ct));
}
+40 -30
View File
@@ -40,13 +40,15 @@ public class TrackContentService
string? album = null,
string? genre = null,
DateOnly? releaseDate = null,
string? originalFileName = null)
string? originalFileName = null,
CancellationToken cancellationToken = default)
{
try
{
// Process the audio file (routed by extension)
var audioBinary = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath);
if (audioBinary == null)
// Process the audio file (routed by extension). The returned plan carries metadata plus a
// streamed writer — no whole-file buffer (the store-path OOM fix).
var processed = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath, cancellationToken);
if (processed == null)
{
throw new InvalidOperationException("Failed to process audio file");
}
@@ -60,8 +62,11 @@ public class TrackContentService
await _fileDatabase.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio);
}
// Store the audio in FileDatabase
var success = await _fileDatabase.RegisterResourceAsync(VaultConstants.Tracks, trackId, audioBinary);
// Stream the audio into the vault. The metadata is supplied directly (there is no in-memory
// AudioBinary on this path), and the bytes are written progressively from the staging file.
var metaData = MetaDataFactory.CreateAudioMetaData(trackId, processed.Extension, processed.Duration, processed.Bitrate);
var success = await _fileDatabase.RegisterResourceStreamingAsync(
VaultConstants.Tracks, trackId, metaData, processed.WriteToAsync, cancellationToken);
if (!success)
{
throw new InvalidOperationException("Failed to store audio in FileDatabase");
@@ -77,7 +82,7 @@ public class TrackContentService
OriginalFileName = originalFileName,
// Persist the processor-extracted runtime to SQL so aggregate queries (total mix runtime)
// need not touch the vault. Same value the high-res waveform compute reads downstream.
DurationSeconds = audioBinary.Duration
DurationSeconds = processed.Duration
};
return trackEntity;
@@ -100,34 +105,37 @@ public class TrackContentService
string? album = null,
string? genre = null,
DateOnly? releaseDate = null,
string? originalFileName = null) =>
AddTrackAsync(wavFilePath, trackName, artist, album, genre, releaseDate, originalFileName);
string? originalFileName = null,
CancellationToken cancellationToken = default) =>
AddTrackAsync(wavFilePath, trackName, artist, album, genre, releaseDate, originalFileName, cancellationToken);
/// <summary>
/// Swaps the audio bytes for an existing track in place: processes a new audio file and
/// re-registers it under the SAME <paramref name="entryKey"/> in the tracks vault. The track's
/// vault key — and therefore its SQL link, release membership, position, and metadata — is
/// untouched; only the binary changes. The new audio is written first; only on confirmed success
/// is a stale old backing file cleaned up. A cross-format replacement (e.g. .wav → .flac) leaves
/// the old file on disk under its former filename once the index is updated; the post-success
/// cleanup removes it. For a same-extension overwrite the register alone suffices — the file is
/// written in place. If the register fails the original audio is left intact and null is returned,
/// so the track remains playable. Returns the freshly stored <see cref="AudioBinary"/> on success
/// (so the caller can regenerate waveform data from the same bytes) — matching the FileDatabase
/// swallow-and-return-null contract.
/// untouched; only the binary changes. The new audio is streamed to the vault first; only on
/// confirmed success is a stale old backing file cleaned up. A cross-format replacement (e.g.
/// .wav → .flac) leaves the old file on disk under its former filename once the index is updated;
/// the post-success cleanup removes it. For a same-extension overwrite the register alone suffices.
/// If the register fails the original audio is left intact and null is returned, so the track
/// remains playable. Returns the freshly stored audio's <b>duration</b> on success (the caller
/// re-reads the vault for waveform regen and uses this for the SQL duration write) — matching the
/// FileDatabase swallow-and-return-null contract. The new bytes are never materialized in memory.
/// </summary>
public async Task<AudioBinary?> ReplaceTrackAudioAsync(string entryKey, string audioFilePath)
public async Task<double?> ReplaceTrackAudioAsync(string entryKey, string audioFilePath, CancellationToken cancellationToken = default)
{
try
{
// Capture the old extension before touching the vault. After register the index
// will point to the new extension, so we need the old value now to detect a
// cross-format swap and clean up the stale file post-success.
var existing = await _fileDatabase.LoadResourceAsync<AudioBinary>(VaultConstants.Tracks, entryKey);
var oldExtension = existing?.Extension;
// Capture the old extension from the index metadata (not by loading the file — that would
// pull the whole old audio into memory). After register the index points to the new
// extension, so we need the old value now to detect a cross-format swap and clean up the
// stale file post-success.
var trackVault = _fileDatabase.GetVault(VaultConstants.Tracks);
var existingMeta = trackVault is null ? null : await trackVault.GetEntryMetadata(entryKey);
var oldExtension = existingMeta?.Extension;
var audioBinary = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath);
if (audioBinary == null)
var processed = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath, cancellationToken);
if (processed == null)
{
Console.WriteLine($"TrackContentService.ReplaceTrackAudioAsync: processing returned null for {entryKey}");
return null;
@@ -138,9 +146,11 @@ public class TrackContentService
await _fileDatabase.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio);
}
// Register the new audio. This upserts the index entry (new extension recorded) and
// writes the new file to disk. If this fails the original entry and file are untouched.
var success = await _fileDatabase.RegisterResourceAsync(VaultConstants.Tracks, entryKey, audioBinary);
// Stream the new audio in. This upserts the index entry (new extension recorded) and writes
// the new file to disk. If this fails the original entry and file are untouched.
var metaData = MetaDataFactory.CreateAudioMetaData(entryKey, processed.Extension, processed.Duration, processed.Bitrate);
var success = await _fileDatabase.RegisterResourceStreamingAsync(
VaultConstants.Tracks, entryKey, metaData, processed.WriteToAsync, cancellationToken);
if (!success)
{
Console.WriteLine($"TrackContentService.ReplaceTrackAudioAsync: vault write failed for {entryKey}; original audio preserved");
@@ -153,7 +163,7 @@ public class TrackContentService
// old path — RemoveResourceAsync would now resolve to the new extension and delete the
// wrong file. Non-fatal: an orphaned old file is a disk-hygiene concern, not a
// playback issue (the index no longer references it).
if (oldExtension != null && oldExtension != audioBinary.Extension)
if (oldExtension != null && oldExtension != processed.Extension)
{
var vault = _fileDatabase.GetVault(VaultConstants.Tracks);
if (vault != null)
@@ -172,7 +182,7 @@ public class TrackContentService
}
}
return audioBinary;
return processed.Duration;
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
+28 -10
View File
@@ -32,13 +32,18 @@ public class AudioProcessorTests
[Test]
public async Task StandardPcm_RoundTripsUnchanged()
{
var path = await WriteWavAsync(BuildMinimalWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, audioFormat: WaveFormatPcm));
var source = BuildMinimalWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, audioFormat: WaveFormatPcm);
var path = await WriteWavAsync(source);
var audio = await new AudioProcessor().ProcessWavFileAsync(path);
Assert.That(audio, Is.Not.Null);
Assert.That(audio!.Duration, Is.GreaterThan(0.0));
Assert.That(audio.Bitrate, Is.GreaterThan(0));
// Standard PCM is passthrough: the streamed bytes must be byte-identical to the source file.
var stored = await Materialize(audio);
Assert.That(stored, Is.EqualTo(source), "Standard PCM must be stored byte-identical (passthrough)");
Assert.That(audio.Size, Is.EqualTo(source.Length), "Passthrough Size must equal the source length");
}
[Test]
@@ -54,7 +59,8 @@ public class AudioProcessorTests
Assert.That(audio, Is.Not.Null);
Assert.That(audio!.Duration, Is.GreaterThan(0.0));
Assert.That(audio.Bitrate, Is.GreaterThan(0));
Assert.That(ReadFmtAudioFormat(audio.Buffer), Is.EqualTo(WaveFormatPcm), "Stored buffer must be standard PCM");
var stored = await Materialize(audio);
Assert.That(ReadFmtAudioFormat(stored), Is.EqualTo(WaveFormatPcm), "Stored buffer must be standard PCM");
}
[Test]
@@ -70,17 +76,19 @@ public class AudioProcessorTests
var audio = await new AudioProcessor().ProcessWavFileAsync(path);
Assert.That(audio, Is.Not.Null);
Assert.That(ReadFmtBitsPerSample(audio!.Buffer), Is.EqualTo(16 + 8), "Float must convert to 24-bit PCM");
Assert.That(ReadFmtAudioFormat(audio.Buffer), Is.EqualTo(WaveFormatPcm));
var stored = await Materialize(audio!);
Assert.That(ReadFmtBitsPerSample(stored), Is.EqualTo(16 + 8), "Float must convert to 24-bit PCM");
Assert.That(ReadFmtAudioFormat(stored), Is.EqualTo(WaveFormatPcm));
// 4 float samples (4 bytes each) → 4 PCM samples (3 bytes each) = 12 data bytes after the 44-byte header.
Assert.That(audio.Buffer.Length, Is.EqualTo(44 + 12));
Assert.That(stored.Length, Is.EqualTo(44 + 12));
Assert.That(audio!.Size, Is.EqualTo(44 + 12), "Computed Size must match the streamed byte count");
// Verify the converted sample values: (int)(sample * 8388607.0), clamped, little-endian 3 bytes.
// 0.5f → 4194303 = 0x3FFFFF → FF FF 3F
// -0.5f → -4194303 = 0xFFC00001 → 24-bit LE: 01 00 C0
// 1.0f → 8388607 = 0x7FFFFF → FF FF 7F
// -1.0f → -8388607 = 0xFF800001 → 24-bit LE: 01 00 80
var expectedData = new byte[] { 0xFF, 0xFF, 0x3F, 0x01, 0x00, 0xC0, 0xFF, 0xFF, 0x7F, 0x01, 0x00, 0x80 };
var actualData = audio.Buffer[44..];
var actualData = stored[44..];
Assert.That(actualData, Is.EqualTo(expectedData), "Float samples must be converted to 24-bit PCM correctly");
}
@@ -97,17 +105,18 @@ public class AudioProcessorTests
var audio = await new AudioProcessor().ProcessWavFileAsync(path);
Assert.That(audio, Is.Not.Null);
Assert.That(ReadFmtBitsPerSample(audio!.Buffer), Is.EqualTo(24), "Padded container must repack to 24-bit");
Assert.That(ReadFmtAudioFormat(audio.Buffer), Is.EqualTo(WaveFormatPcm));
var stored = await Materialize(audio!);
Assert.That(ReadFmtBitsPerSample(stored), Is.EqualTo(24), "Padded container must repack to 24-bit");
Assert.That(ReadFmtAudioFormat(stored), Is.EqualTo(WaveFormatPcm));
// 4 container samples (4 bytes each) → 4 PCM samples (3 bytes each) = 12 data bytes.
Assert.That(audio.Buffer.Length, Is.EqualTo(44 + 12));
Assert.That(stored.Length, Is.EqualTo(44 + 12));
// Verify the repacked sample values: lowest 3 bytes of each 4-byte little-endian container.
// 0x123456 → LE 4 bytes: 56 34 12 00 → keep 3: 56 34 12
// 0xFFEDCBA9 → LE 4 bytes: A9 CB ED FF → keep 3: A9 CB ED
// 0x000001 → LE 4 bytes: 01 00 00 00 → keep 3: 01 00 00
// 0xFF800000 → LE 4 bytes: 00 00 80 FF → keep 3: 00 00 80
var expectedData = new byte[] { 0x56, 0x34, 0x12, 0xA9, 0xCB, 0xED, 0x01, 0x00, 0x00, 0x00, 0x00, 0x80 };
var actualData = audio.Buffer[44..];
var actualData = stored[44..];
Assert.That(actualData, Is.EqualTo(expectedData), "Padded 24-in-32 samples must strip the padding byte correctly");
}
@@ -265,6 +274,15 @@ public class AudioProcessorTests
// -- helpers --------------------------------------------------------------------------------
/// <summary>Streams a <see cref="ProcessedAudio"/> store plan into memory so its canonical bytes
/// can be asserted — the store path no longer hands back a materialized buffer.</summary>
private static async Task<byte[]> Materialize(ProcessedAudio audio)
{
using var ms = new MemoryStream();
await audio.WriteToAsync(ms);
return ms.ToArray();
}
/// <summary>
/// Synthesises a minimal valid MPEG1 Layer III CBR MP3 buffer: one frame header plus enough body
/// bytes for the frame, with an optional Xing VBR header in the side-information region. The body
+478
View File
@@ -0,0 +1,478 @@
using System.Text;
using DeepDrftContent;
using DeepDrftContent.Constants;
using DeepDrftContent.FileDatabase.Models;
using DeepDrftContent.FileDatabase.Services;
using DeepDrftContent.Processors;
using FileDb = DeepDrftContent.FileDatabase.Services.FileDatabase;
namespace DeepDrftTests;
/// <summary>
/// Tests for the streamed audio store path (Wave 1 OOM fix): processors emit a
/// <see cref="ProcessedAudio"/> plan whose body is written to the vault without ever materializing
/// the whole file in a managed <c>byte[]</c>. Covers passthrough byte-identity (standard-PCM WAV,
/// MP3, FLAC), streamed WAV normalization (EXTENSIBLE), the new streaming vault register round-trip,
/// and the memory-bounding contract of the store primitive (bounded-buffer, sequential, forward-only
/// writes — never a single whole-file write).
/// </summary>
[TestFixture]
public class AudioStoreStreamingTests
{
private const ushort WaveFormatPcm = 0x0001;
private const ushort WaveFormatExtensible = 0xFFFE;
private string _testDir = string.Empty;
[SetUp]
public void SetUp()
{
_testDir = Path.Combine(Path.GetTempPath(), "AudioStoreStreamingTests", Guid.NewGuid().ToString());
Directory.CreateDirectory(_testDir);
}
[TearDown]
public void TearDown()
{
try { Directory.Delete(_testDir, recursive: true); }
catch { /* Best-effort cleanup — ignore failures */ }
}
private static AudioProcessorRouter Router() =>
new(new AudioProcessor(), new Mp3AudioProcessor(), new FlacAudioProcessor());
private static TrackContentService Content(FileDb db) => new(db, Router());
// -- End-to-end store byte-identity (passthrough) -----------------------------------------
[Test]
public async Task StandardPcmWav_StoredByteIdenticalToSource()
{
var source = BuildPcmWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, dataBytes: 200_000);
var path = await WriteAsync(source, ".wav");
var db = await FileDb.FromAsync(_testDir);
var content = Content(db!);
var entity = await content.AddTrackAsync(path, "Track", "Artist");
Assert.That(entity, Is.Not.Null);
var stored = await content.GetAudioBinaryAsync(entity!.EntryKey);
Assert.That(stored, Is.Not.Null);
Assert.That(stored!.Buffer, Is.EqualTo(source), "Standard PCM must be stored byte-identical");
Assert.That(stored.Duration, Is.GreaterThan(0.0));
}
[Test]
public async Task Mp3_StoredByteIdenticalToSource_MetadataCorrect()
{
var source = BuildMinimalMp3();
var path = await WriteAsync(source, ".mp3");
var db = await FileDb.FromAsync(_testDir);
var content = Content(db!);
var entity = await content.AddTrackAsync(path, "Track", "Artist");
Assert.That(entity, Is.Not.Null);
var stored = await content.GetAudioBinaryAsync(entity!.EntryKey);
Assert.Multiple(() =>
{
Assert.That(stored, Is.Not.Null);
Assert.That(stored!.Extension, Is.EqualTo(".mp3"));
Assert.That(stored.Buffer, Is.EqualTo(source), "MP3 must be stored byte-identical (no transcode)");
Assert.That(stored.Bitrate, Is.EqualTo(128));
});
}
[Test]
public async Task Flac_StoredByteIdenticalToSource_MetadataCorrect()
{
var source = BuildMinimalFlac();
var path = await WriteAsync(source, ".flac");
var db = await FileDb.FromAsync(_testDir);
var content = Content(db!);
var entity = await content.AddTrackAsync(path, "Track", "Artist");
Assert.That(entity, Is.Not.Null);
var stored = await content.GetAudioBinaryAsync(entity!.EntryKey);
Assert.Multiple(() =>
{
Assert.That(stored, Is.Not.Null);
Assert.That(stored!.Extension, Is.EqualTo(".flac"));
Assert.That(stored.Buffer, Is.EqualTo(source), "FLAC must be stored byte-identical (no transcode)");
Assert.That(stored.Duration, Is.GreaterThan(0.0));
});
}
// -- Streamed normalization ---------------------------------------------------------------
[Test]
public async Task ExtensibleFloatWav_StoredAsNormalizedStandardPcm()
{
// A >80 KB float data region forces the streamed transform across multiple bounded chunks.
var floatData = BuildFloatRamp(sampleCount: 40_000); // 160 000 bytes in, 120 000 bytes out (24-bit)
var source = BuildExtensibleWav(channels: 2, sampleRate: 44100, containerBits: 32, validBits: 32,
subFormatTag: 0x0003, sampleData: floatData);
var path = await WriteAsync(source, ".wav");
var db = await FileDb.FromAsync(_testDir);
var content = Content(db!);
var entity = await content.AddTrackAsync(path, "Track", "Artist");
Assert.That(entity, Is.Not.Null);
var stored = await content.GetAudioBinaryAsync(entity!.EntryKey);
Assert.That(stored, Is.Not.Null);
Assert.Multiple(() =>
{
Assert.That(BitConverter.ToUInt16(stored!.Buffer, 20), Is.EqualTo(WaveFormatPcm),
"EXTENSIBLE must be normalized to standard PCM (audioFormat = 1)");
Assert.That(BitConverter.ToUInt16(stored.Buffer, 34), Is.EqualTo(24), "Float must normalize to 24-bit");
Assert.That(stored.Buffer.Length, Is.EqualTo(44 + (floatData.Length / 4) * 3),
"Output size = 44-byte header + 3 bytes per float sample");
});
}
// -- Streaming vault register round-trip ---------------------------------------------------
[Test]
public async Task RegisterResourceStreamingAsync_RoundTrips()
{
var db = await FileDb.FromAsync(_testDir);
await db!.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio);
var payload = Enumerable.Range(0, 50_000).Select(i => (byte)(i % 256)).ToArray();
var meta = MetaDataFactory.CreateAudioMetaData("entry-1", ".wav", 12.5, 1411);
var ok = await db.RegisterResourceStreamingAsync(
VaultConstants.Tracks, "entry-1", meta,
(dest, ct) => dest.WriteAsync(payload, ct).AsTask());
Assert.That(ok, Is.True);
var loaded = await db.LoadResourceAsync<AudioBinary>(VaultConstants.Tracks, "entry-1");
Assert.Multiple(() =>
{
Assert.That(loaded, Is.Not.Null);
Assert.That(loaded!.Buffer, Is.EqualTo(payload), "Streamed bytes must round-trip exactly");
Assert.That(loaded.Duration, Is.EqualTo(12.5));
Assert.That(loaded.Bitrate, Is.EqualTo(1411));
});
}
[Test]
public async Task RegisterResourceStreamingAsync_UnknownVault_ReturnsFalse()
{
var db = await FileDb.FromAsync(_testDir);
var meta = MetaDataFactory.CreateAudioMetaData("e", ".wav", 1.0, 1411);
var ok = await db!.RegisterResourceStreamingAsync(
"does-not-exist", "e", meta, (dest, ct) => Task.CompletedTask);
Assert.That(ok, Is.False, "Register into a missing vault must swallow and return false");
}
// -- Memory-bounding of the store primitive -----------------------------------------------
[Test]
public async Task WriteToAsync_StreamsInBoundedSequentialChunks_NotOneWholeFileWrite()
{
// A multi-hundred-KB passthrough file: a buffered implementation would issue one giant write of
// the whole body; the streamed primitive must write in bounded, forward-only chunks.
var source = BuildPcmWav(channels: 2, sampleRate: 44100, bitsPerSample: 16, dataBytes: 600_000);
var path = await WriteAsync(source, ".wav");
var processed = await new AudioProcessor().ProcessWavFileAsync(path);
Assert.That(processed, Is.Not.Null);
var probe = new BoundedWriteProbeStream();
await processed!.WriteToAsync(probe);
Assert.Multiple(() =>
{
Assert.That(probe.TotalBytes, Is.EqualTo(source.Length), "All bytes must be written");
Assert.That(probe.WriteCount, Is.GreaterThan(1), "Body must be streamed in multiple chunks");
Assert.That(probe.MaxWriteSize, Is.LessThanOrEqualTo(81920),
"No single write may exceed the bounded buffer — i.e. the whole file is never buffered");
});
}
[Test]
public async Task WriteToAsync_NormalizedFloat_StreamsToForwardOnlyStream()
{
// The normalized path seeks the *source* but must only write the destination sequentially.
var floatData = BuildFloatRamp(sampleCount: 30_000);
var source = BuildExtensibleWav(channels: 2, sampleRate: 44100, containerBits: 32, validBits: 32,
subFormatTag: 0x0003, sampleData: floatData);
var path = await WriteAsync(source, ".wav");
var processed = await new AudioProcessor().ProcessWavFileAsync(path);
Assert.That(processed, Is.Not.Null);
var probe = new BoundedWriteProbeStream();
await processed!.WriteToAsync(probe);
Assert.Multiple(() =>
{
Assert.That(probe.TotalBytes, Is.EqualTo(44 + (floatData.Length / 4) * 3));
Assert.That(probe.WriteCount, Is.GreaterThan(1), "Header + multiple transformed body chunks");
Assert.That(probe.MaxWriteSize, Is.LessThanOrEqualTo(81920));
});
}
// -- atomic-write safety (cancel / fault on replace path) ---------------------------------
/// <summary>
/// A client disconnect (OperationCanceledException) during the streamed write must leave any
/// pre-existing backing file byte-identical. The atomic temp→rename ordering ensures the rename
/// never happens on an incomplete write, so the original file is never truncated or overwritten.
/// </summary>
[Test]
public async Task AddEntryStreamingAsync_CancelMidWrite_OriginalBackingFileUnchanged()
{
var vault = await AudioVault.FromAsync(_testDir);
Assert.That(vault, Is.Not.Null);
const string entryId = "replace-target";
var original = new byte[] { 0xAA, 0xBB, 0xCC, 0xDD, 0xEE };
var originalMeta = MetaDataFactory.CreateAudioMetaData(entryId, ".wav", 10.0, 1411);
// Write the original entry — this is the backing file that must survive a cancelled replace.
await vault!.AddEntryStreamingAsync(entryId, originalMeta,
async (s, ct) => await s.WriteAsync(original, ct));
var backingPath = Path.Combine(_testDir, "replace-target.wav");
Assert.That(File.Exists(backingPath), Is.True, "Pre-condition: original backing file must exist");
var originalOnDisk = await File.ReadAllBytesAsync(backingPath);
Assert.That(originalOnDisk, Is.EqualTo(original), "Pre-condition: backing file must hold original bytes");
// Attempt a replace whose writeContent cancels after writing only a portion.
using var cts = new CancellationTokenSource();
var replacement = new byte[10_000];
Array.Fill(replacement, (byte)0xFF);
var replaceMeta = MetaDataFactory.CreateAudioMetaData(entryId, ".wav", 5.0, 1411);
Assert.ThrowsAsync<OperationCanceledException>(async () =>
await vault.AddEntryStreamingAsync(entryId, replaceMeta,
async (s, ct) =>
{
await s.WriteAsync(replacement.AsMemory(0, 100), ct);
await cts.CancelAsync();
ct.ThrowIfCancellationRequested(); // surfaces the cancel from the token
},
cts.Token));
// The original backing file must be byte-identical — no truncation, no partial replacement.
var afterContent = await File.ReadAllBytesAsync(backingPath);
Assert.That(afterContent, Is.EqualTo(original),
"Original backing file must be byte-identical after a cancelled replace");
// No stray temp files should remain in the vault directory.
var tmpFiles = Directory.GetFiles(_testDir, "*.tmp");
Assert.That(tmpFiles, Is.Empty, "Temp file must be cleaned up after cancel");
}
/// <summary>
/// An I/O fault during the streamed write must leave any pre-existing backing file intact.
/// The atomic temp→rename ordering ensures a faulting write never reaches rename, so the
/// original file is never touched.
/// </summary>
[Test]
public async Task AddEntryStreamingAsync_FaultMidWrite_OriginalBackingFileUnchanged()
{
var vault = await AudioVault.FromAsync(_testDir);
Assert.That(vault, Is.Not.Null);
const string entryId = "fault-target";
var original = new byte[] { 0x01, 0x02, 0x03, 0x04, 0x05 };
var originalMeta = MetaDataFactory.CreateAudioMetaData(entryId, ".wav", 10.0, 1411);
await vault!.AddEntryStreamingAsync(entryId, originalMeta,
async (s, ct) => await s.WriteAsync(original, ct));
var backingPath = Path.Combine(_testDir, "fault-target.wav");
var originalOnDisk = await File.ReadAllBytesAsync(backingPath);
Assert.That(originalOnDisk, Is.EqualTo(original), "Pre-condition: backing file must hold original bytes");
// Attempt a replace whose writeContent throws a simulated I/O fault.
Assert.ThrowsAsync<IOException>(async () =>
await vault.AddEntryStreamingAsync(entryId, originalMeta,
(_, _) => throw new IOException("Simulated I/O fault")));
// The original backing file must be intact.
var afterContent = await File.ReadAllBytesAsync(backingPath);
Assert.That(afterContent, Is.EqualTo(original),
"Original backing file must be byte-identical after a faulting replace");
var tmpFiles = Directory.GetFiles(_testDir, "*.tmp");
Assert.That(tmpFiles, Is.Empty, "Temp file must be cleaned up after fault");
}
// -- builders -----------------------------------------------------------------------------
private async Task<string> WriteAsync(byte[] bytes, string extension)
{
var path = Path.Combine(_testDir, Guid.NewGuid().ToString("N") + extension);
await File.WriteAllBytesAsync(path, bytes);
return path;
}
private static byte[] BuildPcmWav(int channels, int sampleRate, int bitsPerSample, int dataBytes)
{
var blockAlign = (ushort)(channels * (bitsPerSample / 8));
var byteRate = (uint)(sampleRate * blockAlign);
var data = new byte[dataBytes];
for (var i = 0; i < data.Length; i++) data[i] = (byte)(i % 251);
using var ms = new MemoryStream();
using var w = new BinaryWriter(ms, Encoding.ASCII, leaveOpen: true);
w.Write(Encoding.ASCII.GetBytes("RIFF"));
w.Write((uint)(36 + data.Length));
w.Write(Encoding.ASCII.GetBytes("WAVE"));
w.Write(Encoding.ASCII.GetBytes("fmt "));
w.Write(16u);
w.Write(WaveFormatPcm);
w.Write((ushort)channels);
w.Write((uint)sampleRate);
w.Write(byteRate);
w.Write(blockAlign);
w.Write((ushort)bitsPerSample);
w.Write(Encoding.ASCII.GetBytes("data"));
w.Write((uint)data.Length);
w.Write(data);
w.Flush();
return ms.ToArray();
}
private static byte[] BuildExtensibleWav(
int channels, int sampleRate, int containerBits, int validBits, ushort subFormatTag, byte[] sampleData)
{
var blockAlign = (ushort)(channels * (containerBits / 8));
var byteRate = (uint)(sampleRate * blockAlign);
const uint fmtChunkSize = 40;
using var ms = new MemoryStream();
using var w = new BinaryWriter(ms, Encoding.ASCII, leaveOpen: true);
w.Write(Encoding.ASCII.GetBytes("RIFF"));
w.Write((uint)(36 + (fmtChunkSize - 16) + sampleData.Length));
w.Write(Encoding.ASCII.GetBytes("WAVE"));
w.Write(Encoding.ASCII.GetBytes("fmt "));
w.Write(fmtChunkSize);
w.Write(WaveFormatExtensible);
w.Write((ushort)channels);
w.Write((uint)sampleRate);
w.Write(byteRate);
w.Write(blockAlign);
w.Write((ushort)containerBits);
w.Write((ushort)22); // cbSize
w.Write((ushort)validBits); // wValidBitsPerSample
w.Write((uint)0); // channel mask
var guid = new byte[16];
guid[0] = (byte)(subFormatTag & 0xFF);
guid[1] = (byte)((subFormatTag >> 8) & 0xFF);
w.Write(guid);
w.Write(Encoding.ASCII.GetBytes("data"));
w.Write((uint)sampleData.Length);
w.Write(sampleData);
w.Flush();
return ms.ToArray();
}
private static byte[] BuildFloatRamp(int sampleCount)
{
var bytes = new byte[sampleCount * 4];
for (var i = 0; i < sampleCount; i++)
{
var sample = (float)((i % 200) / 200.0 - 0.5); // a deterministic ramp in [-0.5, 0.5)
BitConverter.GetBytes(sample).CopyTo(bytes, i * 4);
}
return bytes;
}
private static byte[] BuildMinimalMp3()
{
// One MPEG1 Layer III CBR frame: 128 kbps, 44.1 kHz, stereo. Body zero-filled (silence).
const int frameSize = 417; // floor(144 * 128000 / 44100)
var buffer = new byte[frameSize];
buffer[0] = 0xFF;
buffer[1] = 0xFB; // sync + MPEG1 + Layer III + no CRC
buffer[2] = (byte)((9 << 4) | (0 << 2)); // bitrate index 9 (128 kbps), sample-rate index 0 (44.1 kHz)
buffer[3] = 0x00; // stereo
return buffer;
}
private static byte[] BuildMinimalFlac()
{
using var ms = new MemoryStream();
ms.Write(Encoding.ASCII.GetBytes("fLaC"));
ms.WriteByte(0x80); // last block, STREAMINFO
ms.WriteByte(0x00);
ms.WriteByte(0x00);
ms.WriteByte(34);
var s = new byte[34];
const int sampleRate = 44100;
const int channels = 2;
const int bitsPerSample = 16;
const long totalSamples = 44100L * 5;
s[10] = (byte)((sampleRate >> 12) & 0xFF);
s[11] = (byte)((sampleRate >> 4) & 0xFF);
var bps = bitsPerSample - 1;
s[12] = (byte)(((sampleRate & 0x0F) << 4) | (((channels - 1) & 0x07) << 1) | ((bps >> 4) & 0x01));
s[13] = (byte)(((bps & 0x0F) << 4) | (int)((totalSamples >> 32) & 0x0F));
s[14] = (byte)((totalSamples >> 24) & 0xFF);
s[15] = (byte)((totalSamples >> 16) & 0xFF);
s[16] = (byte)((totalSamples >> 8) & 0xFF);
s[17] = (byte)(totalSamples & 0xFF);
ms.Write(s);
// Trailing zero bytes standing in for encoded frames (affect only the average-bitrate compute).
ms.Write(new byte[100_000]);
return ms.ToArray();
}
/// <summary>
/// A write-only, forward-only (non-seekable) stream that records how the store primitive writes:
/// the number of writes, the largest single write, and the total. Proves the body is streamed in
/// bounded, sequential chunks rather than buffered into one whole-file write.
/// </summary>
private sealed class BoundedWriteProbeStream : Stream
{
public int WriteCount { get; private set; }
public int MaxWriteSize { get; private set; }
public long TotalBytes { get; private set; }
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => TotalBytes;
public override long Position { get => TotalBytes; set => throw new NotSupportedException(); }
private void Record(int count)
{
WriteCount++;
if (count > MaxWriteSize) MaxWriteSize = count;
TotalBytes += count;
}
public override void Write(byte[] buffer, int offset, int count) => Record(count);
public override void Write(ReadOnlySpan<byte> buffer) => Record(buffer.Length);
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
Record(buffer.Length);
return ValueTask.CompletedTask;
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Record(count);
return Task.CompletedTask;
}
public override void Flush() { }
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
}
}
+14 -11
View File
@@ -68,9 +68,9 @@ public class TrackReplaceAudioTests
var originalDuration = before!.Duration;
var replacement = await WriteWavAsync(BuildMinimalPcmWav(6.0), ".wav");
var newAudio = await content.ReplaceTrackAudioAsync(entryKey, replacement);
var newDuration = await content.ReplaceTrackAudioAsync(entryKey, replacement);
Assert.That(newAudio, Is.Not.Null, "Replace should return the freshly stored audio");
Assert.That(newDuration, Is.Not.Null, "Replace should return the freshly stored audio's duration");
var after = await content.GetAudioBinaryAsync(entryKey);
Assert.Multiple(() =>
@@ -78,8 +78,8 @@ public class TrackReplaceAudioTests
Assert.That(after, Is.Not.Null, "The track must remain retrievable under the same EntryKey");
Assert.That(after!.Duration, Is.GreaterThan(originalDuration),
"The retrieved audio must reflect the longer replacement, not the original");
Assert.That(newAudio!.Duration, Is.EqualTo(after.Duration),
"The returned binary must match what is stored under the key");
Assert.That(newDuration!.Value, Is.EqualTo(after.Duration),
"The returned duration must match what is stored under the key");
});
}
@@ -101,9 +101,9 @@ public class TrackReplaceAudioTests
Assert.That(wavFilesBefore, Is.Not.Empty, "Sanity: the original .wav backing file exists");
var replacement = await WriteFlacAsync();
var newAudio = await content.ReplaceTrackAudioAsync(entryKey, replacement);
var newDuration = await content.ReplaceTrackAudioAsync(entryKey, replacement);
Assert.That(newAudio, Is.Not.Null);
Assert.That(newDuration, Is.Not.Null);
Assert.Multiple(() =>
{
Assert.That(Directory.GetFiles(vaultDir, "*.wav"), Is.Empty,
@@ -135,12 +135,15 @@ public class TrackReplaceAudioTests
Assert.That(staleHighRes, Is.Not.Null);
var replacement = await WriteWavAsync(BuildMinimalPcmWav(20.0), ".wav");
var newAudio = await content.ReplaceTrackAudioAsync(entryKey, replacement);
Assert.That(newAudio, Is.Not.Null);
var newDuration = await content.ReplaceTrackAudioAsync(entryKey, replacement);
Assert.That(newDuration, Is.Not.Null);
// Regen step (mirrors the orchestrator).
Assert.That(await waveforms.ComputeAndStoreAsync(newAudio!.Buffer, entryKey), Is.True);
Assert.That(await waveforms.ComputeAndStoreHighResAsync(newAudio.Buffer, entryKey, newAudio.Duration), Is.True);
// Regen step (mirrors the orchestrator, which re-reads the freshly stored audio from the vault
// rather than consuming an in-memory buffer the streamed store no longer hands back).
var newStored = await content.GetAudioBinaryAsync(entryKey);
Assert.That(newStored, Is.Not.Null);
Assert.That(await waveforms.ComputeAndStoreAsync(newStored!.Buffer, entryKey), Is.True);
Assert.That(await waveforms.ComputeAndStoreHighResAsync(newStored.Buffer, entryKey, newStored.Duration), Is.True);
var freshHighRes = await waveforms.GetProfileAsync(entryKey, VaultConstants.TrackWaveforms);
var freshProfile = await waveforms.GetProfileAsync(entryKey);