Files
deepdrft/DeepDrftContent/Processors/Opus/OggOpusParser.cs
T
daniel-c-harvey 4351ae04be Stream Opus transcode source and encoded output; removes last store-path OOM
Source read via streamed vault open + bounded staging copy (index-only duration/extension); encoded output walked from a bounded stream (new OggOpusParser.WalkAsync, byte-identical to the buffer oracle) and stored via streaming vault write. Adds parity tests.
2026-06-26 14:06:33 -04:00

311 lines
16 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Buffers.Binary;
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// The result of walking an encoded Ogg Opus stream once: the captured setup header (the leading
/// <c>OpusHead</c> + <c>OpusTags</c> pages, verbatim) and the bucketed granule→byte seek index. This
/// is everything the sidecar artifact carries (§3.4a) — built at transcode time so delivery never
/// re-walks the stream.
/// </summary>
/// <param name="SetupHeaderBytes">The leading setup pages (OpusHead + OpusTags), exactly as they
/// appear at the start of the stream, ready to prepend to any mid-stream page run before decode.</param>
/// <param name="SeekIndex">The accurate, 0.5 s-bucketed granule→byte transfer function.</param>
public sealed record OggOpusWalk(byte[] SetupHeaderBytes, OggOpusSeekIndex SeekIndex);
/// <summary>
/// Pure Ogg-Opus stream walker. Reads the page structure directly (the <c>OggS</c> capture pattern and
/// the 27-byte page header) to (1) capture the setup-header pages and (2) record, for every audio page,
/// its end granule position and exact byte offset — bucketed to 0.5 s with each bucket boundary snapped
/// to the nearest enclosing page start. No external dependency: the encoder (FFmpeg) produces the bytes;
/// this turns them into the seek artifact deterministically, so it is unit-testable without a codec.
/// </summary>
/// <remarks>
/// Two entry points share one <see cref="WalkState"/> page-processing core, so they produce byte-identical
/// output by construction (the project's parity-oracle convention, mirroring
/// <c>RmsLoudnessAlgorithm.Compute</c> over its accumulator):
/// <list type="bullet">
/// <item><see cref="Walk(ReadOnlySpan{byte})"/> — the whole-buffer overload, retained as the byte-identity
/// parity oracle for the streaming variant.</item>
/// <item><see cref="WalkAsync(System.IO.Stream,System.Threading.CancellationToken)"/> — the streaming
/// variant: walks the page structure from a forward stream in a bounded read buffer (one Ogg page at a
/// time), so peak managed memory is O(buffer + seek-index + setup-header), independent of file size.</item>
/// </list>
/// </remarks>
public static class OggOpusParser
{
/// <summary>
/// The largest a single Ogg page can be: header(27) + a full 255-entry segment table + the maximum
/// payload those segments can describe (255 × 255 bytes). The streaming read buffer is floored at this
/// so a complete page always fits, which means a short read on a page can only mean a truncated stream.
/// </summary>
private const int MaxOggPageSize = OggOpusConstants.OggPageHeaderSize + 255 + 255 * 255; // 65307
/// <summary>
/// Walks <paramref name="oggBytes"/> and produces the setup header + seek index, or null if the
/// bytes are not a recognisable Ogg Opus stream (no setup header, no audio pages, or truncated
/// structure). A null is the caller's signal to treat the transcode as failed and leave the track
/// lossless-only (C6) — it does not throw for malformed input.
/// </summary>
public static OggOpusWalk? Walk(ReadOnlySpan<byte> oggBytes)
{
var state = new WalkState();
var offset = 0;
while (offset + OggOpusConstants.OggPageHeaderSize <= oggBytes.Length)
{
var page = oggBytes.Slice(offset);
if (!page[..4].SequenceEqual(OggOpusConstants.CapturePattern))
{
// Not on a page boundary — the encoder writes contiguous pages, so this means the
// stream is malformed or we mis-stepped. Either way it is unrecoverable here.
return null;
}
var segmentCount = page[OggOpusConstants.PageSegmentCountOffset];
var segmentTableEnd = OggOpusConstants.OggPageHeaderSize + segmentCount;
if (segmentTableEnd > page.Length)
return null; // truncated header
var payloadSize = 0;
for (var i = 0; i < segmentCount; i++)
payloadSize += page[OggOpusConstants.OggPageHeaderSize + i];
var pageTotalSize = segmentTableEnd + payloadSize;
if (pageTotalSize > page.Length)
return null; // truncated payload
var payload = page.Slice(segmentTableEnd, payloadSize);
var granule = BinaryPrimitives.ReadUInt64LittleEndian(
page.Slice(OggOpusConstants.GranulePositionOffset, 8));
state.AddPage(granule, payload, page.Slice(0, pageTotalSize), offset);
offset += pageTotalSize;
}
return state.Finish((ulong)oggBytes.Length);
}
/// <summary>
/// Streaming counterpart of <see cref="Walk(ReadOnlySpan{byte})"/>: walks the Ogg page structure from
/// a forward <paramref name="stream"/> in a bounded read buffer (one Ogg page at a time), producing a
/// byte-identical <see cref="OggOpusWalk"/> without ever holding the whole encoded file in memory.
/// Returns null on the same malformed/truncated conditions as the buffer overload — it does not throw
/// for bad input (only <see cref="OperationCanceledException"/> propagates on cancellation).
/// </summary>
public static Task<OggOpusWalk?> WalkAsync(Stream stream, CancellationToken cancellationToken = default)
=> WalkAsync(stream, MaxOggPageSize, cancellationToken);
/// <summary>
/// Buffer-size-parameterised core. <paramref name="bufferSize"/> is floored at
/// <see cref="MaxOggPageSize"/> so any single page always fits in the buffer; the absolute byte
/// cursor (<c>absoluteOffset</c>) advances as the window compacts, so recorded seek offsets stay
/// absolute even though the buffer holds only a small window at any instant.
/// </summary>
internal static async Task<OggOpusWalk?> WalkAsync(Stream stream, int bufferSize, CancellationToken cancellationToken)
{
var state = new WalkState();
var buffer = new byte[Math.Max(bufferSize, MaxOggPageSize)];
var len = 0; // valid bytes held at buffer[0..len]
long absoluteOffset = 0; // absolute stream position of buffer[0]
while (true)
{
// The buffer overload's loop guard requires a full fixed header before parsing a page; once
// fewer than that remain (and the stream is drained) it is the natural end of the stream.
if (len < OggOpusConstants.OggPageHeaderSize)
{
len += await FillAsync(stream, buffer, len, cancellationToken);
if (len < OggOpusConstants.OggPageHeaderSize)
break;
}
if (!buffer.AsSpan(0, 4).SequenceEqual(OggOpusConstants.CapturePattern))
return null;
var segmentCount = buffer[OggOpusConstants.PageSegmentCountOffset];
var segmentTableEnd = OggOpusConstants.OggPageHeaderSize + segmentCount;
if (len < segmentTableEnd)
{
len += await FillAsync(stream, buffer, len, cancellationToken);
if (len < segmentTableEnd)
return null; // truncated header
}
var payloadSize = 0;
for (var i = 0; i < segmentCount; i++)
payloadSize += buffer[OggOpusConstants.OggPageHeaderSize + i];
var pageTotalSize = segmentTableEnd + payloadSize;
if (len < pageTotalSize)
{
len += await FillAsync(stream, buffer, len, cancellationToken);
if (len < pageTotalSize)
return null; // truncated payload (page never fully arrived before EOF)
}
var page = buffer.AsSpan(0, pageTotalSize);
var granule = BinaryPrimitives.ReadUInt64LittleEndian(
page.Slice(OggOpusConstants.GranulePositionOffset, 8));
var payload = page.Slice(segmentTableEnd, payloadSize);
state.AddPage(granule, payload, page, absoluteOffset);
// Compact the consumed page off the front of the window; the absolute cursor advances by the
// exact page size so every offset the state records remains an absolute stream position.
var remaining = len - pageTotalSize;
if (remaining > 0)
Buffer.BlockCopy(buffer, pageTotalSize, buffer, 0, remaining);
len = remaining;
absoluteOffset += pageTotalSize;
}
return state.Finish((ulong)absoluteOffset + (ulong)len);
}
/// <summary>
/// Fills <paramref name="buffer"/> from <paramref name="offset"/> to its end, issuing as many reads as
/// needed until the buffer is full or the stream is exhausted. Returns the number of bytes added.
/// </summary>
private static async Task<int> FillAsync(Stream stream, byte[] buffer, int offset, CancellationToken ct)
{
var added = 0;
while (offset + added < buffer.Length)
{
var read = await stream.ReadAsync(buffer.AsMemory(offset + added, buffer.Length - offset - added), ct);
if (read == 0)
break;
added += read;
}
return added;
}
private static bool StartsWith(ReadOnlySpan<byte> payload, ReadOnlySpan<byte> signature) =>
payload.Length >= signature.Length && payload[..signature.Length].SequenceEqual(signature);
/// <summary>
/// The single page-processing core both <see cref="Walk(ReadOnlySpan{byte})"/> and
/// <see cref="WalkAsync(Stream,int,CancellationToken)"/> drive, page by page, in stream order. Holding
/// the setup-header + seek-index accumulation here is what makes the two entry points byte-identical
/// by construction: there is exactly one copy of the OpusHead/OpusTags detection, pre-skip correction,
/// t=0 anchoring, and 0.5 s bucketing logic.
/// </summary>
private sealed class WalkState
{
// The real setup (OpusHead + OpusTags pages) is a few KB; this cap bounds the streaming capture so
// a malformed head-without-tags stream cannot grow it unboundedly. A stream that exceeds it has no
// OpusTags within the cap, so no audio points are ever recorded and Finish returns null either way
// — the cap never changes the output of a stream that produces a non-null result.
private const int MaxSetupHeaderBytes = 8 * 1024 * 1024;
private bool _sawOpusHead;
private bool _sawOpusTags;
private ushort _preSkip;
private int _setupHeaderEnd = -1;
private bool _capturingSetup = true;
private readonly List<byte> _setupHeader = new();
private readonly List<OpusSeekPoint> _points = new();
private ulong _lastGranule;
private double _nextBucketTime;
private bool _firstAudioPointTaken;
/// <summary>
/// Processes one fully-framed page. <paramref name="pageBytes"/> is the whole page (header +
/// segment table + payload) for verbatim setup capture; <paramref name="absoluteOffset"/> is the
/// page's absolute start in the stream — the value recorded in the seek index.
/// </summary>
public void AddPage(ulong granule, ReadOnlySpan<byte> payload, ReadOnlySpan<byte> pageBytes, long absoluteOffset)
{
if (_capturingSetup)
{
if (_setupHeader.Count + pageBytes.Length > MaxSetupHeaderBytes)
_capturingSetup = false; // malformed: give up capture (result will be null without tags)
else
_setupHeader.AddRange(pageBytes);
}
// The setup pages carry no audio granule (OpusHead has granulepos 0; OpusTags too). They
// are the leading pages whose payload opens with the Opus magic signatures.
if (!_sawOpusHead && StartsWith(payload, OggOpusConstants.OpusHeadSignature))
{
_sawOpusHead = true;
_setupHeaderEnd = (int)(absoluteOffset + pageBytes.Length);
// RFC 7845 §5.1 — OpusHead layout after the 8-byte "OpusHead" magic:
// [0] version (1 byte), [1] channel count (1 byte),
// [2-3] pre_skip (little-endian uint16) ← at packet bytes 10-11
// pre_skip is the number of decoder samples to discard before presenting audio;
// all granule→time conversions must subtract it (RFC 7845 §4.3).
if (payload.Length >= OggOpusConstants.OpusHeadMinSize)
_preSkip = BinaryPrimitives.ReadUInt16LittleEndian(
payload.Slice(OggOpusConstants.OpusHeadPreSkipOffset, 2));
}
else if (_sawOpusHead && !_sawOpusTags && StartsWith(payload, OggOpusConstants.OpusTagsSignature))
{
_sawOpusTags = true;
_setupHeaderEnd = (int)(absoluteOffset + pageBytes.Length);
// The setup header ends at the OpusTags page; stop capturing so audio pages never grow it.
_capturingSetup = false;
}
else if (_sawOpusHead && _sawOpusTags)
{
// Audio page. Record the first audio page unconditionally (the seek anchor at t=0),
// then one entry per 0.5 s bucket. A page with no end-granule (mid-packet continuation,
// granulepos == -1) is skipped for indexing — its time is unknown — but still advances
// the byte cursor.
if (granule != OggOpusConstants.NoGranulePosition)
{
// RFC 7845 §4.3: presentation time = max(0, granule preSkip) / 48000.
// Use this corrected time for bucketing so that a stream with pre-skip 3840 (~80 ms)
// does not systematically offset every indexed time by that amount.
var correctedTime = Math.Max(0.0,
(granule - (double)_preSkip) / OggOpusConstants.OpusSampleRate);
if (!_firstAudioPointTaken)
{
// Anchor the first seek point at corrected time = 0 by storing the granule as
// preSkip. This guarantees that a binary search for t=0 ("largest entry with
// corrected time ≤ 0") always resolves to the first audio page's byte offset —
// even when the real granule is slightly above preSkip due to encoder lead-in.
_points.Add(new OpusSeekPoint(_preSkip, (ulong)absoluteOffset));
_firstAudioPointTaken = true;
_nextBucketTime = OggOpusConstants.SeekBucketSeconds;
}
else if (correctedTime >= _nextBucketTime)
{
_points.Add(new OpusSeekPoint(granule, (ulong)absoluteOffset));
// Advance past every bucket this page crossed so a long page does not emit a
// backlog of entries; the next bucket is the first boundary strictly after it.
while (_nextBucketTime <= correctedTime)
_nextBucketTime += OggOpusConstants.SeekBucketSeconds;
}
_lastGranule = granule;
}
}
}
/// <summary>
/// Produces the final walk, or null on the same conditions the buffer overload rejected:
/// no OpusHead, no captured setup header, or no audio seek points. <paramref name="totalByteLength"/>
/// is the full stream length, recorded for end-of-stream seek clamping.
/// </summary>
public OggOpusWalk? Finish(ulong totalByteLength)
{
if (!_sawOpusHead || _setupHeaderEnd < 0 || _points.Count == 0)
return null;
var setupHeader = _setupHeader.ToArray();
// RFC 7845 §4.3: total duration is also pre-skip-corrected, matching the time a listener
// experiences (the last audio page's corrected time, clamped to ≥ 0).
var totalDuration = Math.Max(0.0,
(_lastGranule - (double)_preSkip) / OggOpusConstants.OpusSampleRate);
var index = new OggOpusSeekIndex(_points, totalDuration, totalByteLength, _preSkip);
return new OggOpusWalk(setupHeader, index);
}
}
}