fix(api): stream audio store path to eliminate whole-file buffering (OOM)
Processors now emit a ProcessedAudio plan with a streamed writer instead of a whole-file AudioBinary; vault writes stream via RegisterResourceStreamingAsync. Header parsing is bounded. Wave 2 (waveform/Opus) still re-reads the full file by design.
This commit is contained in:
@@ -178,6 +178,42 @@ public class FileDatabase : DirectoryIndexDirectory, IDisposable
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Registers a resource by streaming its bytes into the vault, without materializing the whole
|
||||
/// file in a managed <c>byte[]</c> (the store-path OOM fix). The caller supplies the index
|
||||
/// <paramref name="metaData"/> and a <paramref name="writeContent"/> callback that emits bytes to
|
||||
/// the backing stream. Swallows exceptions and returns false, matching
|
||||
/// <see cref="RegisterResourceAsync"/>'s contract — callers check the bool.
|
||||
/// </summary>
|
||||
public async Task<bool> RegisterResourceStreamingAsync(
|
||||
string vaultId,
|
||||
string entryId,
|
||||
MetaData metaData,
|
||||
Func<Stream, CancellationToken, Task> writeContent,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var directoryVault = _vaults.Get(vaultId);
|
||||
if (directoryVault != null)
|
||||
{
|
||||
var written = await directoryVault.AddEntryStreamingAsync(entryId, metaData, writeContent, cancellationToken);
|
||||
_logger.LogInformation(
|
||||
"Streamed {Bytes} bytes into vault {VaultId} entry {EntryId} (no whole-file buffer).",
|
||||
written, vaultId, entryId);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Swallow and return false, matching RegisterResourceAsync. Logged (unlike the buffered
|
||||
// path) because a streamed write failure can leave a partial backing file worth noticing.
|
||||
_logger.LogError(ex, "RegisterResourceStreamingAsync failed for vault {VaultId} entry {EntryId}", vaultId, entryId);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Removes a resource from a specific vault. Returns null if the vault does not exist,
|
||||
/// false if the entry was not found, true if the entry was removed. Distinguishing
|
||||
|
||||
@@ -56,6 +56,37 @@ public abstract class MediaVault : VaultIndexDirectory
|
||||
await FileUtils.PutFileAsync(mediaPath, buffer);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Streams an entry's bytes into the vault without ever materializing the whole file in memory:
|
||||
/// records the supplied <paramref name="metaData"/> in the index, then invokes
|
||||
/// <paramref name="writeContent"/> to emit bytes directly to the backing <see cref="FileStream"/>.
|
||||
/// The metadata is supplied by the caller (there is no in-memory <see cref="FileBinary"/> to infer
|
||||
/// it from) — the store path (upload / replace-audio) sources its bytes from a staging file, not a
|
||||
/// buffer. Returns the number of bytes written, for the caller to log.
|
||||
///
|
||||
/// Index-then-file ordering matches <see cref="AddEntryAsync"/>; a mid-write failure therefore
|
||||
/// leaves an index entry over a partial/missing file, the same exposure the buffered path has on
|
||||
/// an I/O fault. The caller treats a thrown exception as a failed register.
|
||||
/// </summary>
|
||||
public async Task<long> AddEntryStreamingAsync(
|
||||
string entryId,
|
||||
MetaData metaData,
|
||||
Func<Stream, CancellationToken, Task> writeContent,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var mediaPath = GetMediaPathFromEntryKey(entryId, metaData.Extension);
|
||||
|
||||
await AddToIndexAsync(entryId, metaData);
|
||||
|
||||
await using var fileStream = new FileStream(
|
||||
mediaPath, FileMode.Create, FileAccess.Write, FileShare.None,
|
||||
bufferSize: 81920, useAsync: true);
|
||||
await writeContent(fileStream, cancellationToken);
|
||||
await fileStream.FlushAsync(cancellationToken);
|
||||
|
||||
return fileStream.Length;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves an entry from the vault (MediaVaultType inferred from T)
|
||||
/// </summary>
|
||||
|
||||
@@ -7,12 +7,22 @@ namespace DeepDrftContent.Processors;
|
||||
/// </summary>
|
||||
public class AudioProcessor
|
||||
{
|
||||
// Header parsing never needs the audio body. Read the file in 64 KB steps until the data-chunk
|
||||
// header is locatable, capping the window so a pathological file with an enormous pre-data header
|
||||
// cannot drive an unbounded allocation — such a file simply falls through to default metadata and
|
||||
// passthrough storage, the same outcome as any unparseable WAV.
|
||||
private const int HeaderWindowStep = 64 * 1024;
|
||||
private const int HeaderWindowCap = 8 * 1024 * 1024;
|
||||
|
||||
/// <summary>
|
||||
/// Processes a WAV file and creates an AudioBinary object
|
||||
/// Processes a WAV file into a <see cref="ProcessedAudio"/> store plan: extracts metadata from a
|
||||
/// bounded header window (never the whole file) and returns a streamed writer for the canonical
|
||||
/// vault bytes. Standard PCM is stored verbatim (passthrough copy); EXTENSIBLE-PCM / IEEE-float /
|
||||
/// padded-container WAVs are normalized to a plain 44-byte standard-PCM WAV, written progressively
|
||||
/// so the vault only ever holds a format the streaming pipeline already handles.
|
||||
/// </summary>
|
||||
/// <param name="filePath">Path to the WAV file</param>
|
||||
/// <returns>AudioBinary object with metadata</returns>
|
||||
public async Task<AudioBinary?> ProcessWavFileAsync(string filePath)
|
||||
public async Task<ProcessedAudio?> ProcessWavFileAsync(string filePath, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!File.Exists(filePath))
|
||||
{
|
||||
@@ -26,30 +36,197 @@ public class AudioProcessor
|
||||
|
||||
try
|
||||
{
|
||||
var buffer = await File.ReadAllBytesAsync(filePath);
|
||||
var wavInfo = ExtractWavMetadata(buffer);
|
||||
var fileLength = new FileInfo(filePath).Length;
|
||||
var window = await ReadWavHeaderWindowAsync(filePath, cancellationToken);
|
||||
var wavInfo = ExtractWavMetadata(window);
|
||||
|
||||
// EXTENSIBLE-PCM is byte-compatible with standard PCM but carries a 40+ byte fmt chunk
|
||||
// the streaming pipeline never expects. Normalize to a plain 44-byte PCM WAV at storage
|
||||
// time so the vault only ever holds standard PCM and the client decode path stays unchanged.
|
||||
var storedBuffer = wavInfo.IsExtensible ? NormalizeToStandardPcm(buffer, wavInfo) : buffer;
|
||||
if (!wavInfo.IsExtensible)
|
||||
{
|
||||
// Standard PCM (or the default-fallback path, which reports IsExtensible = false):
|
||||
// the source bytes are already a format the pipeline handles, so store them verbatim.
|
||||
return ProcessedAudio.Passthrough(filePath, ".wav", wavInfo.Duration, wavInfo.Bitrate, fileLength);
|
||||
}
|
||||
|
||||
var parameters = new AudioBinaryParams(
|
||||
Buffer: storedBuffer,
|
||||
Size: storedBuffer.Length,
|
||||
Extension: ".wav",
|
||||
Duration: wavInfo.Duration,
|
||||
Bitrate: wavInfo.Bitrate
|
||||
);
|
||||
// EXTENSIBLE → streamed normalization. The output data size is derivable from the source
|
||||
// data size alone (no body read needed): verbatim keeps it, float drops 1 byte per sample
|
||||
// (4→3), padded keeps only the valid bytes per container sample.
|
||||
var dataStart = (long)wavInfo.DataChunkPos + 8;
|
||||
var available = fileLength - dataStart;
|
||||
var srcDataSize = Math.Min((long)wavInfo.DataSize, available);
|
||||
|
||||
return new AudioBinary(parameters);
|
||||
NormalizeMode mode;
|
||||
int outBitsPerSample;
|
||||
long outDataSize;
|
||||
int containerBytes = 0;
|
||||
int validBytes = 0;
|
||||
if (wavInfo.IsFloat)
|
||||
{
|
||||
mode = NormalizeMode.Float;
|
||||
outBitsPerSample = 24;
|
||||
outDataSize = (srcDataSize / 4) * 3;
|
||||
}
|
||||
else if (wavInfo.IsPaddedContainer)
|
||||
{
|
||||
mode = NormalizeMode.Padded;
|
||||
outBitsPerSample = wavInfo.BitsPerSample;
|
||||
containerBytes = wavInfo.ContainerBitsPerSample / 8;
|
||||
validBytes = wavInfo.BitsPerSample / 8;
|
||||
outDataSize = (srcDataSize / containerBytes) * validBytes;
|
||||
}
|
||||
else
|
||||
{
|
||||
mode = NormalizeMode.Verbatim;
|
||||
outBitsPerSample = wavInfo.BitsPerSample;
|
||||
outDataSize = srcDataSize;
|
||||
}
|
||||
|
||||
var channels = wavInfo.Channels;
|
||||
var sampleRate = wavInfo.SampleRate;
|
||||
|
||||
return new ProcessedAudio(
|
||||
".wav", wavInfo.Duration, wavInfo.Bitrate, 44 + outDataSize,
|
||||
(destination, ct) => WriteNormalizedWavAsync(
|
||||
filePath, dataStart, srcDataSize, channels, sampleRate, outBitsPerSample,
|
||||
outDataSize, mode, containerBytes, validBytes, destination, ct));
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
throw new InvalidOperationException($"Failed to process WAV file: {ex.Message}", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads only enough of the file to contain the fmt chunk and the data chunk's 8-byte header, so
|
||||
/// metadata parsing never loads the (potentially ~GB) audio body. Grows the window in 64 KB steps
|
||||
/// until the data chunk is locatable or EOF/<see cref="HeaderWindowCap"/> is hit.
|
||||
/// </summary>
|
||||
private static async Task<byte[]> ReadWavHeaderWindowAsync(string filePath, CancellationToken ct)
|
||||
{
|
||||
await using var fs = new FileStream(
|
||||
filePath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||
bufferSize: HeaderWindowStep, useAsync: true);
|
||||
|
||||
using var ms = new MemoryStream();
|
||||
var buffer = new byte[HeaderWindowStep];
|
||||
while (ms.Length < HeaderWindowCap)
|
||||
{
|
||||
var read = await fs.ReadAsync(buffer, ct);
|
||||
if (read == 0)
|
||||
break;
|
||||
ms.Write(buffer, 0, read);
|
||||
|
||||
// FindChunk returns -1 on a partial window (the data chunk isn't reachable yet), so keep
|
||||
// reading until it is found or the cap/EOF is hit. On normal files the data chunk header
|
||||
// sits within the first 64 KB, so this loop runs exactly once.
|
||||
var soFar = ms.ToArray();
|
||||
if (FindChunk(soFar, "data") >= 0)
|
||||
return soFar;
|
||||
}
|
||||
|
||||
return ms.ToArray();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Writes a normalized standard-PCM WAV to <paramref name="destination"/>: the 44-byte header
|
||||
/// followed by the data region streamed from the source in bounded, sample-aligned chunks. No
|
||||
/// whole-file buffer is ever held — peak memory is O(chunk), independent of duration.
|
||||
/// </summary>
|
||||
private async Task WriteNormalizedWavAsync(
|
||||
string sourcePath, long dataStart, long srcDataSize,
|
||||
int channels, int sampleRate, int outBitsPerSample, long outDataSize,
|
||||
NormalizeMode mode, int containerBytes, int validBytes,
|
||||
Stream destination, CancellationToken ct)
|
||||
{
|
||||
var header = BuildStandardPcmHeader(channels, sampleRate, outBitsPerSample, outDataSize);
|
||||
await destination.WriteAsync(header, ct);
|
||||
|
||||
await using var src = new FileStream(
|
||||
sourcePath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||
bufferSize: 81920, useAsync: true);
|
||||
src.Seek(dataStart, SeekOrigin.Begin);
|
||||
|
||||
switch (mode)
|
||||
{
|
||||
case NormalizeMode.Verbatim:
|
||||
await CopyBoundedAsync(src, destination, srcDataSize, ct);
|
||||
break;
|
||||
case NormalizeMode.Float:
|
||||
// Each 4-byte float sample becomes 3 bytes of 24-bit PCM.
|
||||
await TransformBoundedAsync(src, destination, srcDataSize, unit: 4,
|
||||
transform: (buf, len) => ConvertFloatTo24BitPcm(buf, 0, len), ct);
|
||||
break;
|
||||
case NormalizeMode.Padded:
|
||||
await TransformBoundedAsync(src, destination, srcDataSize, unit: containerBytes,
|
||||
transform: (buf, len) => RepackPaddedContainer(buf, 0, len, containerBytes * 8, validBytes * 8), ct);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Bounded copy of exactly <paramref name="totalBytes"/> from src to dest.</summary>
|
||||
private static async Task CopyBoundedAsync(Stream src, Stream dest, long totalBytes, CancellationToken ct)
|
||||
{
|
||||
var buffer = new byte[81920];
|
||||
var remaining = totalBytes;
|
||||
while (remaining > 0)
|
||||
{
|
||||
var want = (int)Math.Min(buffer.Length, remaining);
|
||||
var read = await src.ReadAsync(buffer.AsMemory(0, want), ct);
|
||||
if (read == 0)
|
||||
break;
|
||||
await dest.WriteAsync(buffer.AsMemory(0, read), ct);
|
||||
remaining -= read;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Streams <paramref name="totalBytes"/> of source data through <paramref name="transform"/> in
|
||||
/// sample-aligned chunks, writing each transformed chunk to <paramref name="dest"/>. The read
|
||||
/// buffer is a multiple of <paramref name="unit"/>; leftover bytes that do not complete a sample
|
||||
/// are carried into the next read, and a final partial sample is dropped (matching the
|
||||
/// whole-buffer transforms' integer-division behavior).
|
||||
/// </summary>
|
||||
private static async Task TransformBoundedAsync(
|
||||
Stream src, Stream dest, long totalBytes, int unit,
|
||||
Func<byte[], int, byte[]> transform, CancellationToken ct)
|
||||
{
|
||||
var bufLen = Math.Max(unit, (81920 / unit) * unit);
|
||||
var buffer = new byte[bufLen];
|
||||
var remaining = totalBytes;
|
||||
var carried = 0;
|
||||
while (remaining > 0)
|
||||
{
|
||||
var want = (int)Math.Min(bufLen - carried, remaining);
|
||||
if (want == 0)
|
||||
break;
|
||||
var read = await src.ReadAsync(buffer.AsMemory(carried, want), ct);
|
||||
if (read == 0)
|
||||
break;
|
||||
remaining -= read;
|
||||
|
||||
var filled = carried + read;
|
||||
var whole = (filled / unit) * unit;
|
||||
if (whole > 0)
|
||||
{
|
||||
var output = transform(buffer, whole);
|
||||
await dest.WriteAsync(output, ct);
|
||||
}
|
||||
|
||||
carried = filled - whole;
|
||||
if (carried > 0)
|
||||
Array.Copy(buffer, whole, buffer, 0, carried);
|
||||
}
|
||||
}
|
||||
|
||||
private enum NormalizeMode
|
||||
{
|
||||
/// <summary>Sample bytes already standard PCM (EXTENSIBLE-PCM, depth == container width).</summary>
|
||||
Verbatim,
|
||||
/// <summary>IEEE float samples converted to 24-bit PCM.</summary>
|
||||
Float,
|
||||
/// <summary>Padded container (e.g. 24-in-32) re-packed to the valid depth.</summary>
|
||||
Padded
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Extracts the raw PCM data region and format parameters from a WAV buffer, reusing the
|
||||
/// same chunk-walk and validation as metadata extraction. Returns null if the buffer is not
|
||||
@@ -317,50 +494,17 @@ public class AudioProcessor
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Rebuilds an EXTENSIBLE WAV as a canonical 44-byte-header standard PCM WAV (audioFormat = 1)
|
||||
/// so the vault only ever holds a format the streaming pipeline already handles. Three source
|
||||
/// shapes are normalized:
|
||||
/// <list type="bullet">
|
||||
/// <item>EXTENSIBLE-PCM (depth == container): sample bytes are byte-identical to standard PCM and
|
||||
/// copied verbatim; only the header is replaced.</item>
|
||||
/// <item>IEEE float: 32-bit float samples are converted to 24-bit signed integer PCM.</item>
|
||||
/// <item>Padded container (e.g. 24-in-32): the padding/sign-extension bytes are stripped, keeping
|
||||
/// the lowest valid bytes per sample.</item>
|
||||
/// </list>
|
||||
/// The output header always reports the valid bit depth (<see cref="WavMetadata.BitsPerSample"/>).
|
||||
/// Builds the canonical 44-byte standard-PCM WAV header (audioFormat = 1) for a normalized stream.
|
||||
/// The body is written separately so no whole-file buffer is allocated; this only emits the header
|
||||
/// the streaming pipeline expects, reporting the valid (post-normalization) bit depth.
|
||||
/// </summary>
|
||||
private byte[] NormalizeToStandardPcm(byte[] buffer, WavMetadata metadata)
|
||||
private static byte[] BuildStandardPcmHeader(int channels, int sampleRate, int outBitsPerSample, long dataSize)
|
||||
{
|
||||
// Clamp the declared data size to what is actually present; some encoders overshoot.
|
||||
var dataStart = metadata.DataChunkPos + 8;
|
||||
var available = buffer.Length - dataStart;
|
||||
var srcDataSize = Math.Min(metadata.DataSize, available);
|
||||
|
||||
byte[] dataBytes;
|
||||
int outBitsPerSample;
|
||||
if (metadata.IsFloat)
|
||||
{
|
||||
dataBytes = ConvertFloatTo24BitPcm(buffer, dataStart, srcDataSize);
|
||||
outBitsPerSample = 24;
|
||||
}
|
||||
else if (metadata.IsPaddedContainer)
|
||||
{
|
||||
dataBytes = RepackPaddedContainer(buffer, dataStart, srcDataSize, metadata.ContainerBitsPerSample, metadata.BitsPerSample);
|
||||
outBitsPerSample = metadata.BitsPerSample;
|
||||
}
|
||||
else
|
||||
{
|
||||
dataBytes = new byte[srcDataSize];
|
||||
Array.Copy(buffer, dataStart, dataBytes, 0, srcDataSize);
|
||||
outBitsPerSample = metadata.BitsPerSample;
|
||||
}
|
||||
|
||||
var dataSize = dataBytes.Length;
|
||||
const int headerSize = 44;
|
||||
var result = new byte[headerSize + dataSize];
|
||||
var result = new byte[headerSize];
|
||||
|
||||
var blockAlign = (ushort)(metadata.Channels * (outBitsPerSample / 8));
|
||||
var byteRate = (uint)(metadata.SampleRate * blockAlign);
|
||||
var blockAlign = (ushort)(channels * (outBitsPerSample / 8));
|
||||
var byteRate = (uint)(sampleRate * blockAlign);
|
||||
|
||||
// RIFF header
|
||||
System.Text.Encoding.ASCII.GetBytes("RIFF").CopyTo(result, 0);
|
||||
@@ -371,8 +515,8 @@ public class AudioProcessor
|
||||
System.Text.Encoding.ASCII.GetBytes("fmt ").CopyTo(result, 12);
|
||||
BitConverter.GetBytes((uint)16).CopyTo(result, 16);
|
||||
BitConverter.GetBytes((ushort)1).CopyTo(result, 20); // audioFormat = PCM
|
||||
BitConverter.GetBytes((ushort)metadata.Channels).CopyTo(result, 22);
|
||||
BitConverter.GetBytes((uint)metadata.SampleRate).CopyTo(result, 24);
|
||||
BitConverter.GetBytes((ushort)channels).CopyTo(result, 22);
|
||||
BitConverter.GetBytes((uint)sampleRate).CopyTo(result, 24);
|
||||
BitConverter.GetBytes(byteRate).CopyTo(result, 28);
|
||||
BitConverter.GetBytes(blockAlign).CopyTo(result, 32);
|
||||
BitConverter.GetBytes((ushort)outBitsPerSample).CopyTo(result, 34);
|
||||
@@ -381,8 +525,6 @@ public class AudioProcessor
|
||||
System.Text.Encoding.ASCII.GetBytes("data").CopyTo(result, 36);
|
||||
BitConverter.GetBytes((uint)dataSize).CopyTo(result, 40);
|
||||
|
||||
Array.Copy(dataBytes, 0, result, headerSize, dataSize);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -459,7 +601,7 @@ public class AudioProcessor
|
||||
/// <summary>
|
||||
/// Finds a chunk in the WAV file buffer with proper alignment handling
|
||||
/// </summary>
|
||||
private int FindChunk(byte[] buffer, string chunkId)
|
||||
private static int FindChunk(byte[] buffer, string chunkId)
|
||||
{
|
||||
var chunkBytes = System.Text.Encoding.ASCII.GetBytes(chunkId);
|
||||
int offset = 12; // Start after RIFF header
|
||||
|
||||
@@ -24,18 +24,18 @@ public class AudioProcessorRouter
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Processes <paramref name="filePath"/> with the processor matching its extension, returning an
|
||||
/// <see cref="AudioBinary"/> carrying the stored bytes and extracted metadata. Throws
|
||||
/// <see cref="ArgumentException"/> for unsupported extensions.
|
||||
/// Processes <paramref name="filePath"/> with the processor matching its extension, returning a
|
||||
/// <see cref="ProcessedAudio"/> store plan (extracted metadata plus a streamed writer for the
|
||||
/// canonical vault bytes). Throws <see cref="ArgumentException"/> for unsupported extensions.
|
||||
/// </summary>
|
||||
public async Task<AudioBinary?> ProcessAudioFileAsync(string filePath)
|
||||
public async Task<ProcessedAudio?> ProcessAudioFileAsync(string filePath, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var ext = Path.GetExtension(filePath).ToLowerInvariant();
|
||||
return ext switch
|
||||
{
|
||||
".wav" => await _wavProcessor.ProcessWavFileAsync(filePath),
|
||||
".mp3" => await _mp3Processor.ProcessMp3FileAsync(filePath),
|
||||
".flac" => await _flacProcessor.ProcessFlacFileAsync(filePath),
|
||||
".wav" => await _wavProcessor.ProcessWavFileAsync(filePath, cancellationToken),
|
||||
".mp3" => await _mp3Processor.ProcessMp3FileAsync(filePath, cancellationToken),
|
||||
".flac" => await _flacProcessor.ProcessFlacFileAsync(filePath, cancellationToken),
|
||||
_ => throw new ArgumentException($"Unsupported audio format: {ext}", nameof(filePath)),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
namespace DeepDrftContent.Processors;
|
||||
|
||||
/// <summary>
|
||||
/// Bounded-buffer streaming primitives shared by the audio processors on the store path. None of
|
||||
/// these hold the whole file in memory: copies move a fixed window at a time, and the header read
|
||||
/// caps its allocation regardless of file size.
|
||||
/// </summary>
|
||||
internal static class AudioStoreStream
|
||||
{
|
||||
private const int CopyBufferSize = 81920; // 80 KB — matches the controller staging copy.
|
||||
|
||||
/// <summary>
|
||||
/// Bounded disk-to-disk copy of <paramref name="sourcePath"/> into <paramref name="destination"/>.
|
||||
/// Used for passthrough formats whose stored bytes equal the source bytes. Hand-rolled rather than
|
||||
/// <see cref="Stream.CopyToAsync(Stream)"/> because <c>FileStream</c>'s override writes in 128 KB
|
||||
/// blocks; this keeps every write at or below <see cref="CopyBufferSize"/>, so peak managed memory
|
||||
/// is provably O(buffer), never O(filesize).
|
||||
/// </summary>
|
||||
public static async Task CopyFileAsync(string sourcePath, Stream destination, CancellationToken ct)
|
||||
{
|
||||
await using var src = new FileStream(
|
||||
sourcePath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||
bufferSize: CopyBufferSize, useAsync: true);
|
||||
|
||||
var buffer = new byte[CopyBufferSize];
|
||||
int read;
|
||||
while ((read = await src.ReadAsync(buffer, ct)) > 0)
|
||||
{
|
||||
await destination.WriteAsync(buffer.AsMemory(0, read), ct);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads at most <paramref name="cap"/> bytes from the start of <paramref name="path"/> — enough
|
||||
/// for header/metadata parsing without loading the (potentially ~GB) body. Bounds the allocation
|
||||
/// at <c>min(cap, fileLength)</c>. Size-based metadata (e.g. average bitrate) must use the true
|
||||
/// file length, supplied separately, not the prefix length.
|
||||
/// </summary>
|
||||
public static async Task<byte[]> ReadPrefixAsync(string path, long cap, CancellationToken ct)
|
||||
{
|
||||
await using var fs = new FileStream(
|
||||
path, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||
bufferSize: CopyBufferSize, useAsync: true);
|
||||
|
||||
var length = (int)Math.Min(cap, fs.Length);
|
||||
var buffer = new byte[length];
|
||||
var total = 0;
|
||||
while (total < length)
|
||||
{
|
||||
var read = await fs.ReadAsync(buffer.AsMemory(total, length - total), ct);
|
||||
if (read == 0)
|
||||
break;
|
||||
total += read;
|
||||
}
|
||||
|
||||
return total == length ? buffer : buffer[..total];
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,11 @@ public class FlacAudioProcessor
|
||||
private const double FallbackDuration = 180.0;
|
||||
private const int FallbackBitrate = 1411;
|
||||
|
||||
public async Task<AudioBinary?> ProcessFlacFileAsync(string filePath)
|
||||
// STREAMINFO is mandatory and always the first metadata block, immediately after the 4-byte magic
|
||||
// (data at offset 8, 34 bytes). A small prefix read covers it without loading the body.
|
||||
private const long HeaderCap = 64 * 1024;
|
||||
|
||||
public async Task<ProcessedAudio?> ProcessFlacFileAsync(string filePath, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!File.Exists(filePath))
|
||||
{
|
||||
@@ -24,25 +28,21 @@ public class FlacAudioProcessor
|
||||
throw new ArgumentException("File must be a FLAC file", nameof(filePath));
|
||||
}
|
||||
|
||||
var buffer = await File.ReadAllBytesAsync(filePath);
|
||||
var meta = ExtractFlacMetadata(buffer);
|
||||
var fileLength = new FileInfo(filePath).Length;
|
||||
var window = await AudioStoreStream.ReadPrefixAsync(filePath, HeaderCap, cancellationToken);
|
||||
var meta = ExtractFlacMetadata(window, fileLength);
|
||||
|
||||
var parameters = new AudioBinaryParams(
|
||||
Buffer: buffer,
|
||||
Size: buffer.Length,
|
||||
Extension: ".flac",
|
||||
Duration: meta.Duration,
|
||||
Bitrate: meta.Bitrate);
|
||||
|
||||
return new AudioBinary(parameters);
|
||||
// FLAC is stored unmodified — passthrough the original bytes via a streamed disk-to-disk copy.
|
||||
return ProcessedAudio.Passthrough(filePath, ".flac", meta.Duration, meta.Bitrate, fileLength);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Validates the <c>fLaC</c> magic and the leading STREAMINFO block, then computes duration from
|
||||
/// total-samples / sample-rate and average bitrate from file size. On any parse failure, logs a
|
||||
/// warning and returns synthetic defaults — never throws.
|
||||
/// warning and returns synthetic defaults — never throws. <paramref name="fileLength"/> is the true
|
||||
/// file size (the header window may be shorter), used for the average-bitrate computation.
|
||||
/// </summary>
|
||||
private static FlacMetadata ExtractFlacMetadata(byte[] buffer)
|
||||
private static FlacMetadata ExtractFlacMetadata(byte[] buffer, long fileLength)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -84,7 +84,7 @@ public class FlacAudioProcessor
|
||||
|
||||
var duration = (double)totalSamples / sampleRate;
|
||||
var bitrate = duration > 0
|
||||
? (int)(buffer.LongLength * 8L / (duration * 1000))
|
||||
? (int)(fileLength * 8L / (duration * 1000))
|
||||
: FallbackBitrate;
|
||||
|
||||
return new FlacMetadata { Duration = duration, Bitrate = bitrate };
|
||||
|
||||
@@ -25,7 +25,13 @@ public class Mp3AudioProcessor
|
||||
private const double FallbackDuration = 180.0;
|
||||
private const int FallbackBitrate = 320;
|
||||
|
||||
public async Task<AudioBinary?> ProcessMp3FileAsync(string filePath)
|
||||
// Metadata lives in the leading ID3v2 tag plus the first MPEG frame. Cap the header read so a
|
||||
// large MP3 is not pulled into memory whole just to read it; a tag larger than this (very large
|
||||
// embedded art) simply falls back to the CBR/default estimate, never an OOM. The body is stored
|
||||
// by streaming the original file, not from this window.
|
||||
private const long HeaderCap = 8 * 1024 * 1024;
|
||||
|
||||
public async Task<ProcessedAudio?> ProcessMp3FileAsync(string filePath, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!File.Exists(filePath))
|
||||
{
|
||||
@@ -37,24 +43,21 @@ public class Mp3AudioProcessor
|
||||
throw new ArgumentException("File must be an MP3 file", nameof(filePath));
|
||||
}
|
||||
|
||||
var buffer = await File.ReadAllBytesAsync(filePath);
|
||||
var meta = ExtractMp3Metadata(buffer);
|
||||
var fileLength = new FileInfo(filePath).Length;
|
||||
var window = await AudioStoreStream.ReadPrefixAsync(filePath, HeaderCap, cancellationToken);
|
||||
var meta = ExtractMp3Metadata(window, fileLength);
|
||||
|
||||
var parameters = new AudioBinaryParams(
|
||||
Buffer: buffer,
|
||||
Size: buffer.Length,
|
||||
Extension: ".mp3",
|
||||
Duration: meta.Duration,
|
||||
Bitrate: meta.Bitrate);
|
||||
|
||||
return new AudioBinary(parameters);
|
||||
// MP3 is stored unmodified — passthrough the original bytes via a streamed disk-to-disk copy.
|
||||
return ProcessedAudio.Passthrough(filePath, ".mp3", meta.Duration, meta.Bitrate, fileLength);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses the first valid MPEG frame (after any ID3v2 tag) and any Xing/VBRI tag inside it.
|
||||
/// On any parse failure, logs a warning and returns synthetic defaults — never throws.
|
||||
/// <paramref name="fileLength"/> is the true file size (the header window may be shorter), used
|
||||
/// for the CBR duration estimate.
|
||||
/// </summary>
|
||||
private static Mp3Metadata ExtractMp3Metadata(byte[] buffer)
|
||||
private static Mp3Metadata ExtractMp3Metadata(byte[] buffer, long fileLength)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -65,7 +68,7 @@ public class Mp3AudioProcessor
|
||||
}
|
||||
|
||||
var header = DecodeFrameHeader(buffer, frameStart);
|
||||
var duration = ComputeDuration(buffer, frameStart, header);
|
||||
var duration = ComputeDuration(buffer, frameStart, header, fileLength);
|
||||
|
||||
return new Mp3Metadata { Duration = duration, Bitrate = header.BitrateKbps };
|
||||
}
|
||||
@@ -202,7 +205,7 @@ public class Mp3AudioProcessor
|
||||
/// Computes duration from a Xing/Info or VBRI tag (accurate for VBR) when present; otherwise
|
||||
/// falls back to the CBR estimate fileSize / (bitrate_kbps * 125). Guards divide-by-zero.
|
||||
/// </summary>
|
||||
private static double ComputeDuration(byte[] buffer, int frameStart, FrameHeader header)
|
||||
private static double ComputeDuration(byte[] buffer, int frameStart, FrameHeader header, long fileLength)
|
||||
{
|
||||
var xingFrames = ReadXingFrameCount(buffer, frameStart, header);
|
||||
if (xingFrames > 0 && header.SampleRate > 0)
|
||||
@@ -216,10 +219,10 @@ public class Mp3AudioProcessor
|
||||
return (double)vbriFrames * header.SamplesPerFrame / header.SampleRate;
|
||||
}
|
||||
|
||||
// CBR fallback: bitrate_kbps * 1000 / 8 bytes per second = bitrate_kbps * 125.
|
||||
// Exclude the ID3v2 tag bytes (everything before frameStart) from the estimate.
|
||||
// CBR fallback: bitrate_kbps * 1000 / 8 bytes per second = bitrate_kbps * 125. Uses the true
|
||||
// file length (not the bounded header window), excluding the ID3v2 tag bytes before frameStart.
|
||||
var bytesPerSecond = header.BitrateKbps * 125;
|
||||
return bytesPerSecond > 0 ? (double)(buffer.Length - frameStart) / bytesPerSecond : FallbackDuration;
|
||||
return bytesPerSecond > 0 ? (double)(fileLength - frameStart) / bytesPerSecond : FallbackDuration;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
namespace DeepDrftContent.Processors;
|
||||
|
||||
/// <summary>
|
||||
/// The product of processing an uploaded audio file on the store path: the metadata SQL and the
|
||||
/// vault index need, plus a streamed writer that emits the canonical vault bytes to a destination
|
||||
/// stream without ever materializing the whole file in a managed <c>byte[]</c>.
|
||||
///
|
||||
/// This replaces the former whole-file <c>AudioBinary</c> as the processor output for upload /
|
||||
/// replace-audio (Wave 1 OOM fix): passthrough formats (standard-PCM WAV, MP3, FLAC) stream the
|
||||
/// source file straight to the destination, and EXTENSIBLE WAVs stream their normalization to
|
||||
/// standard PCM. The vault <em>load</em> path still uses <c>AudioBinary</c> (a full buffer) — that
|
||||
/// is the Wave 2 read path and is out of scope here.
|
||||
///
|
||||
/// <see cref="WriteToAsync"/> is invoked exactly once by the streaming vault register, against the
|
||||
/// freshly opened backing <see cref="System.IO.FileStream"/>. The writer re-opens the source file
|
||||
/// itself, so the source (a staging file) must still exist when the register runs — it does, because
|
||||
/// processing and registration are sequential within the store call, before the staging-file
|
||||
/// <c>finally</c> cleanup.
|
||||
/// </summary>
|
||||
public sealed class ProcessedAudio
|
||||
{
|
||||
/// <summary>The stored file extension (e.g. <c>.wav</c>, <c>.mp3</c>, <c>.flac</c>).</summary>
|
||||
public string Extension { get; }
|
||||
|
||||
/// <summary>Audio duration in seconds, extracted from the header.</summary>
|
||||
public double Duration { get; }
|
||||
|
||||
/// <summary>Audio bitrate in kbps, extracted from (or estimated for) the header.</summary>
|
||||
public int Bitrate { get; }
|
||||
|
||||
/// <summary>
|
||||
/// The canonical stored byte count — computed from the header and file length, never by
|
||||
/// buffering the body. Used only for diagnostics (confirming the streamed path was taken).
|
||||
/// </summary>
|
||||
public long Size { get; }
|
||||
|
||||
private readonly Func<Stream, CancellationToken, Task> _writeTo;
|
||||
|
||||
public ProcessedAudio(
|
||||
string extension,
|
||||
double duration,
|
||||
int bitrate,
|
||||
long size,
|
||||
Func<Stream, CancellationToken, Task> writeTo)
|
||||
{
|
||||
Extension = extension;
|
||||
Duration = duration;
|
||||
Bitrate = bitrate;
|
||||
Size = size;
|
||||
_writeTo = writeTo;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Streams the canonical vault bytes to <paramref name="destination"/>. Bounded-buffer — peak
|
||||
/// managed memory is O(buffer), not O(filesize).
|
||||
/// </summary>
|
||||
public Task WriteToAsync(Stream destination, CancellationToken cancellationToken = default)
|
||||
=> _writeTo(destination, cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Builds a passthrough plan: the stored bytes are byte-identical to the source file (standard
|
||||
/// PCM WAV, MP3, FLAC — no transcoding). The writer is a bounded disk-to-disk copy.
|
||||
/// </summary>
|
||||
public static ProcessedAudio Passthrough(
|
||||
string sourcePath, string extension, double duration, int bitrate, long sourceLength)
|
||||
=> new(extension, duration, bitrate, sourceLength,
|
||||
(destination, ct) => AudioStoreStream.CopyFileAsync(sourcePath, destination, ct));
|
||||
}
|
||||
@@ -40,13 +40,15 @@ public class TrackContentService
|
||||
string? album = null,
|
||||
string? genre = null,
|
||||
DateOnly? releaseDate = null,
|
||||
string? originalFileName = null)
|
||||
string? originalFileName = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Process the audio file (routed by extension)
|
||||
var audioBinary = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath);
|
||||
if (audioBinary == null)
|
||||
// Process the audio file (routed by extension). The returned plan carries metadata plus a
|
||||
// streamed writer — no whole-file buffer (the store-path OOM fix).
|
||||
var processed = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath, cancellationToken);
|
||||
if (processed == null)
|
||||
{
|
||||
throw new InvalidOperationException("Failed to process audio file");
|
||||
}
|
||||
@@ -60,8 +62,11 @@ public class TrackContentService
|
||||
await _fileDatabase.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio);
|
||||
}
|
||||
|
||||
// Store the audio in FileDatabase
|
||||
var success = await _fileDatabase.RegisterResourceAsync(VaultConstants.Tracks, trackId, audioBinary);
|
||||
// Stream the audio into the vault. The metadata is supplied directly (there is no in-memory
|
||||
// AudioBinary on this path), and the bytes are written progressively from the staging file.
|
||||
var metaData = MetaDataFactory.CreateAudioMetaData(trackId, processed.Extension, processed.Duration, processed.Bitrate);
|
||||
var success = await _fileDatabase.RegisterResourceStreamingAsync(
|
||||
VaultConstants.Tracks, trackId, metaData, processed.WriteToAsync, cancellationToken);
|
||||
if (!success)
|
||||
{
|
||||
throw new InvalidOperationException("Failed to store audio in FileDatabase");
|
||||
@@ -77,7 +82,7 @@ public class TrackContentService
|
||||
OriginalFileName = originalFileName,
|
||||
// Persist the processor-extracted runtime to SQL so aggregate queries (total mix runtime)
|
||||
// need not touch the vault. Same value the high-res waveform compute reads downstream.
|
||||
DurationSeconds = audioBinary.Duration
|
||||
DurationSeconds = processed.Duration
|
||||
};
|
||||
|
||||
return trackEntity;
|
||||
@@ -100,34 +105,37 @@ public class TrackContentService
|
||||
string? album = null,
|
||||
string? genre = null,
|
||||
DateOnly? releaseDate = null,
|
||||
string? originalFileName = null) =>
|
||||
AddTrackAsync(wavFilePath, trackName, artist, album, genre, releaseDate, originalFileName);
|
||||
string? originalFileName = null,
|
||||
CancellationToken cancellationToken = default) =>
|
||||
AddTrackAsync(wavFilePath, trackName, artist, album, genre, releaseDate, originalFileName, cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Swaps the audio bytes for an existing track in place: processes a new audio file and
|
||||
/// re-registers it under the SAME <paramref name="entryKey"/> in the tracks vault. The track's
|
||||
/// vault key — and therefore its SQL link, release membership, position, and metadata — is
|
||||
/// untouched; only the binary changes. The new audio is written first; only on confirmed success
|
||||
/// is a stale old backing file cleaned up. A cross-format replacement (e.g. .wav → .flac) leaves
|
||||
/// the old file on disk under its former filename once the index is updated; the post-success
|
||||
/// cleanup removes it. For a same-extension overwrite the register alone suffices — the file is
|
||||
/// written in place. If the register fails the original audio is left intact and null is returned,
|
||||
/// so the track remains playable. Returns the freshly stored <see cref="AudioBinary"/> on success
|
||||
/// (so the caller can regenerate waveform data from the same bytes) — matching the FileDatabase
|
||||
/// swallow-and-return-null contract.
|
||||
/// untouched; only the binary changes. The new audio is streamed to the vault first; only on
|
||||
/// confirmed success is a stale old backing file cleaned up. A cross-format replacement (e.g.
|
||||
/// .wav → .flac) leaves the old file on disk under its former filename once the index is updated;
|
||||
/// the post-success cleanup removes it. For a same-extension overwrite the register alone suffices.
|
||||
/// If the register fails the original audio is left intact and null is returned, so the track
|
||||
/// remains playable. Returns the freshly stored audio's <b>duration</b> on success (the caller
|
||||
/// re-reads the vault for waveform regen and uses this for the SQL duration write) — matching the
|
||||
/// FileDatabase swallow-and-return-null contract. The new bytes are never materialized in memory.
|
||||
/// </summary>
|
||||
public async Task<AudioBinary?> ReplaceTrackAudioAsync(string entryKey, string audioFilePath)
|
||||
public async Task<double?> ReplaceTrackAudioAsync(string entryKey, string audioFilePath, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Capture the old extension before touching the vault. After register the index
|
||||
// will point to the new extension, so we need the old value now to detect a
|
||||
// cross-format swap and clean up the stale file post-success.
|
||||
var existing = await _fileDatabase.LoadResourceAsync<AudioBinary>(VaultConstants.Tracks, entryKey);
|
||||
var oldExtension = existing?.Extension;
|
||||
// Capture the old extension from the index metadata (not by loading the file — that would
|
||||
// pull the whole old audio into memory). After register the index points to the new
|
||||
// extension, so we need the old value now to detect a cross-format swap and clean up the
|
||||
// stale file post-success.
|
||||
var trackVault = _fileDatabase.GetVault(VaultConstants.Tracks);
|
||||
var existingMeta = trackVault is null ? null : await trackVault.GetEntryMetadata(entryKey);
|
||||
var oldExtension = existingMeta?.Extension;
|
||||
|
||||
var audioBinary = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath);
|
||||
if (audioBinary == null)
|
||||
var processed = await _audioProcessorRouter.ProcessAudioFileAsync(audioFilePath, cancellationToken);
|
||||
if (processed == null)
|
||||
{
|
||||
Console.WriteLine($"TrackContentService.ReplaceTrackAudioAsync: processing returned null for {entryKey}");
|
||||
return null;
|
||||
@@ -138,9 +146,11 @@ public class TrackContentService
|
||||
await _fileDatabase.CreateVaultAsync(VaultConstants.Tracks, MediaVaultType.Audio);
|
||||
}
|
||||
|
||||
// Register the new audio. This upserts the index entry (new extension recorded) and
|
||||
// writes the new file to disk. If this fails the original entry and file are untouched.
|
||||
var success = await _fileDatabase.RegisterResourceAsync(VaultConstants.Tracks, entryKey, audioBinary);
|
||||
// Stream the new audio in. This upserts the index entry (new extension recorded) and writes
|
||||
// the new file to disk. If this fails the original entry and file are untouched.
|
||||
var metaData = MetaDataFactory.CreateAudioMetaData(entryKey, processed.Extension, processed.Duration, processed.Bitrate);
|
||||
var success = await _fileDatabase.RegisterResourceStreamingAsync(
|
||||
VaultConstants.Tracks, entryKey, metaData, processed.WriteToAsync, cancellationToken);
|
||||
if (!success)
|
||||
{
|
||||
Console.WriteLine($"TrackContentService.ReplaceTrackAudioAsync: vault write failed for {entryKey}; original audio preserved");
|
||||
@@ -153,7 +163,7 @@ public class TrackContentService
|
||||
// old path — RemoveResourceAsync would now resolve to the new extension and delete the
|
||||
// wrong file. Non-fatal: an orphaned old file is a disk-hygiene concern, not a
|
||||
// playback issue (the index no longer references it).
|
||||
if (oldExtension != null && oldExtension != audioBinary.Extension)
|
||||
if (oldExtension != null && oldExtension != processed.Extension)
|
||||
{
|
||||
var vault = _fileDatabase.GetVault(VaultConstants.Tracks);
|
||||
if (vault != null)
|
||||
@@ -172,7 +182,7 @@ public class TrackContentService
|
||||
}
|
||||
}
|
||||
|
||||
return audioBinary;
|
||||
return processed.Duration;
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user