Stream Opus transcode source and encoded output; removes last store-path OOM
Source read via streamed vault open + bounded staging copy (index-only duration/extension); encoded output walked from a bounded stream (new OggOpusParser.WalkAsync, byte-identical to the buffer oracle) and stored via streaming vault write. Adds parity tests.
This commit is contained in:
@@ -17,17 +17,20 @@ namespace DeepDrftContent.Processors.Opus;
|
||||
public sealed class OpusTranscodeService
|
||||
{
|
||||
private readonly FileDb _fileDatabase;
|
||||
private readonly TrackContentService _trackContent;
|
||||
private readonly FfmpegOpusEncoder _encoder;
|
||||
private readonly OpusTranscodeOptions _options;
|
||||
private readonly ILogger<OpusTranscodeService> _logger;
|
||||
|
||||
public OpusTranscodeService(
|
||||
FileDb fileDatabase,
|
||||
TrackContentService trackContent,
|
||||
FfmpegOpusEncoder encoder,
|
||||
IOptions<OpusTranscodeOptions> options,
|
||||
ILogger<OpusTranscodeService> logger)
|
||||
{
|
||||
_fileDatabase = fileDatabase;
|
||||
_trackContent = trackContent;
|
||||
_encoder = encoder;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
@@ -43,27 +46,52 @@ public sealed class OpusTranscodeService
|
||||
/// </summary>
|
||||
public async Task<bool> TranscodeAndStoreAsync(string entryKey, CancellationToken ct)
|
||||
{
|
||||
var source = await _fileDatabase.LoadResourceAsync<AudioBinary>(VaultConstants.Tracks, entryKey);
|
||||
if (source is null)
|
||||
// Read the source extension + duration from the vault index (no body load) and open a streamed
|
||||
// read over the source bytes — never the whole-buffer AudioBinary. A 92-min mix source is ~970 MB;
|
||||
// buffering it (and the encoded output below) was the last unconverted store-path OOM violation.
|
||||
var trackDuration = await _trackContent.GetAudioDurationAsync(entryKey) ?? 0.0;
|
||||
var sourceMedia = await _trackContent.OpenAudioMediaStreamAsync(entryKey);
|
||||
if (sourceMedia is null)
|
||||
{
|
||||
_logger.LogWarning("Opus transcode: no source audio in vault for {EntryKey}; skipping.", entryKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
Directory.CreateDirectory(_options.StagingPath);
|
||||
var sourcePath = Path.Combine(_options.StagingPath, $"opus-src-{Guid.NewGuid():N}{source.Extension}");
|
||||
var opusPath = Path.Combine(_options.StagingPath, $"opus-out-{Guid.NewGuid():N}{OggOpusConstants.OpusExtension}");
|
||||
|
||||
string? sourcePath = null;
|
||||
string? opusPath = null;
|
||||
try
|
||||
{
|
||||
await File.WriteAllBytesAsync(sourcePath, source.Buffer, ct);
|
||||
// Stage the source to disk in bounded chunks so ffmpeg can read it by file path/extension.
|
||||
// The inner finally disposes the source stream as soon as the copy is done — the read handle
|
||||
// is not held across the (long) encode — and guarantees disposal even if staging setup throws.
|
||||
try
|
||||
{
|
||||
Directory.CreateDirectory(_options.StagingPath);
|
||||
sourcePath = Path.Combine(_options.StagingPath, $"opus-src-{Guid.NewGuid():N}{sourceMedia.Extension}");
|
||||
opusPath = Path.Combine(_options.StagingPath, $"opus-out-{Guid.NewGuid():N}{OggOpusConstants.OpusExtension}");
|
||||
|
||||
await using var staging = new FileStream(
|
||||
sourcePath, FileMode.Create, FileAccess.Write, FileShare.None,
|
||||
bufferSize: 81920, useAsync: true);
|
||||
await sourceMedia.Stream.CopyToAsync(staging, bufferSize: 81920, ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await sourceMedia.DisposeAsync();
|
||||
}
|
||||
|
||||
if (!await _encoder.EncodeAsync(sourcePath, opusPath, ct))
|
||||
return false; // encoder already logged the cause
|
||||
|
||||
var opusBytes = await File.ReadAllBytesAsync(opusPath, ct);
|
||||
|
||||
var walk = OggOpusParser.Walk(opusBytes);
|
||||
// Walk the encoded output from a streamed read in a bounded buffer (no whole-file load). The
|
||||
// seek index and setup header are byte-identical to the buffer walk (parity-tested).
|
||||
OggOpusWalk? walk;
|
||||
await using (var opusIn = new FileStream(
|
||||
opusPath, FileMode.Open, FileAccess.Read, FileShare.Read,
|
||||
bufferSize: 81920, useAsync: true))
|
||||
{
|
||||
walk = await OggOpusParser.WalkAsync(opusIn, ct);
|
||||
}
|
||||
if (walk is null)
|
||||
{
|
||||
_logger.LogError(
|
||||
@@ -74,27 +102,32 @@ public sealed class OpusTranscodeService
|
||||
|
||||
await EnsureVaultAsync();
|
||||
|
||||
var opusBitrate = source.Duration > 0
|
||||
? (int)(opusBytes.Length * 8 / source.Duration / 1000)
|
||||
// Bitrate from the output file length + duration — both available without buffering the bytes.
|
||||
var opusLength = new FileInfo(opusPath).Length;
|
||||
var opusBitrate = trackDuration > 0
|
||||
? (int)(opusLength * 8 / trackDuration / 1000)
|
||||
: _options.BitrateKbps;
|
||||
var audioBinary = new AudioBinary(new AudioBinaryParams(
|
||||
opusBytes, opusBytes.Length, OggOpusConstants.OpusExtension, source.Duration, opusBitrate));
|
||||
|
||||
var sidecar = new OpusSidecar(walk.SetupHeaderBytes, walk.SeekIndex).ToBytes();
|
||||
var sidecarBinary = new MediaBinary(new MediaBinaryParams(
|
||||
sidecar, sidecar.Length, OggOpusConstants.SidecarExtension));
|
||||
|
||||
// Store the audio first, then the sidecar. If the sidecar write fails the Opus bytes are
|
||||
// present but unseekable — treat that as a failed derive (return false) so a backfill re-runs
|
||||
// it; do not leave a half-derived track that the delivery layer would treat as complete.
|
||||
var audioStored = await _fileDatabase.RegisterResourceAsync(
|
||||
VaultConstants.TrackOpus, OpusAudioKey(entryKey), audioBinary);
|
||||
var audioMeta = MetaDataFactory.CreateAudioMetaData(
|
||||
OpusAudioKey(entryKey), OggOpusConstants.OpusExtension, trackDuration, opusBitrate);
|
||||
var stagedOpusPath = opusPath;
|
||||
var audioStored = await _fileDatabase.RegisterResourceStreamingAsync(
|
||||
VaultConstants.TrackOpus, OpusAudioKey(entryKey), audioMeta,
|
||||
(destination, token) => AudioStoreStream.CopyFileAsync(stagedOpusPath, destination, token), ct);
|
||||
if (!audioStored)
|
||||
{
|
||||
_logger.LogError("Opus transcode: vault write of Opus audio failed for {EntryKey}.", entryKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
// The sidecar is the setup header (a few KB) plus the seek index (~16 bytes per 0.5 s bucket);
|
||||
// it is inherently bounded and already in managed memory, so the whole-buffer write is correct.
|
||||
var sidecar = new OpusSidecar(walk.SetupHeaderBytes, walk.SeekIndex).ToBytes();
|
||||
var sidecarBinary = new MediaBinary(new MediaBinaryParams(
|
||||
sidecar, sidecar.Length, OggOpusConstants.SidecarExtension));
|
||||
var sidecarStored = await _fileDatabase.RegisterResourceAsync(
|
||||
VaultConstants.TrackOpus, OpusSidecarKey(entryKey), sidecarBinary);
|
||||
if (!sidecarStored)
|
||||
@@ -105,7 +138,7 @@ public sealed class OpusTranscodeService
|
||||
|
||||
_logger.LogInformation(
|
||||
"Opus transcode complete for {EntryKey}: {OpusBytes} bytes, {Points} seek points, {Duration:F1}s.",
|
||||
entryKey, opusBytes.Length, walk.SeekIndex.Points.Count, walk.SeekIndex.TotalDurationSeconds);
|
||||
entryKey, opusLength, walk.SeekIndex.Points.Count, walk.SeekIndex.TotalDurationSeconds);
|
||||
return true;
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
@@ -119,8 +152,10 @@ public sealed class OpusTranscodeService
|
||||
}
|
||||
finally
|
||||
{
|
||||
TryDelete(sourcePath);
|
||||
TryDelete(opusPath);
|
||||
if (sourcePath is not null)
|
||||
TryDelete(sourcePath);
|
||||
if (opusPath is not null)
|
||||
TryDelete(opusPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user