4351ae04be
Source read via streamed vault open + bounded staging copy (index-only duration/extension); encoded output walked from a bounded stream (new OggOpusParser.WalkAsync, byte-identical to the buffer oracle) and stored via streaming vault write. Adds parity tests.
311 lines
16 KiB
C#
311 lines
16 KiB
C#
using System.Buffers.Binary;
|
||
|
||
namespace DeepDrftContent.Processors.Opus;
|
||
|
||
/// <summary>
|
||
/// The result of walking an encoded Ogg Opus stream once: the captured setup header (the leading
|
||
/// <c>OpusHead</c> + <c>OpusTags</c> pages, verbatim) and the bucketed granule→byte seek index. This
|
||
/// is everything the sidecar artifact carries (§3.4a) — built at transcode time so delivery never
|
||
/// re-walks the stream.
|
||
/// </summary>
|
||
/// <param name="SetupHeaderBytes">The leading setup pages (OpusHead + OpusTags), exactly as they
|
||
/// appear at the start of the stream, ready to prepend to any mid-stream page run before decode.</param>
|
||
/// <param name="SeekIndex">The accurate, 0.5 s-bucketed granule→byte transfer function.</param>
|
||
public sealed record OggOpusWalk(byte[] SetupHeaderBytes, OggOpusSeekIndex SeekIndex);
|
||
|
||
/// <summary>
|
||
/// Pure Ogg-Opus stream walker. Reads the page structure directly (the <c>OggS</c> capture pattern and
|
||
/// the 27-byte page header) to (1) capture the setup-header pages and (2) record, for every audio page,
|
||
/// its end granule position and exact byte offset — bucketed to 0.5 s with each bucket boundary snapped
|
||
/// to the nearest enclosing page start. No external dependency: the encoder (FFmpeg) produces the bytes;
|
||
/// this turns them into the seek artifact deterministically, so it is unit-testable without a codec.
|
||
/// </summary>
|
||
/// <remarks>
|
||
/// Two entry points share one <see cref="WalkState"/> page-processing core, so they produce byte-identical
|
||
/// output by construction (the project's parity-oracle convention, mirroring
|
||
/// <c>RmsLoudnessAlgorithm.Compute</c> over its accumulator):
|
||
/// <list type="bullet">
|
||
/// <item><see cref="Walk(ReadOnlySpan{byte})"/> — the whole-buffer overload, retained as the byte-identity
|
||
/// parity oracle for the streaming variant.</item>
|
||
/// <item><see cref="WalkAsync(System.IO.Stream,System.Threading.CancellationToken)"/> — the streaming
|
||
/// variant: walks the page structure from a forward stream in a bounded read buffer (one Ogg page at a
|
||
/// time), so peak managed memory is O(buffer + seek-index + setup-header), independent of file size.</item>
|
||
/// </list>
|
||
/// </remarks>
|
||
public static class OggOpusParser
|
||
{
|
||
/// <summary>
|
||
/// The largest a single Ogg page can be: header(27) + a full 255-entry segment table + the maximum
|
||
/// payload those segments can describe (255 × 255 bytes). The streaming read buffer is floored at this
|
||
/// so a complete page always fits, which means a short read on a page can only mean a truncated stream.
|
||
/// </summary>
|
||
private const int MaxOggPageSize = OggOpusConstants.OggPageHeaderSize + 255 + 255 * 255; // 65307
|
||
|
||
/// <summary>
|
||
/// Walks <paramref name="oggBytes"/> and produces the setup header + seek index, or null if the
|
||
/// bytes are not a recognisable Ogg Opus stream (no setup header, no audio pages, or truncated
|
||
/// structure). A null is the caller's signal to treat the transcode as failed and leave the track
|
||
/// lossless-only (C6) — it does not throw for malformed input.
|
||
/// </summary>
|
||
public static OggOpusWalk? Walk(ReadOnlySpan<byte> oggBytes)
|
||
{
|
||
var state = new WalkState();
|
||
|
||
var offset = 0;
|
||
while (offset + OggOpusConstants.OggPageHeaderSize <= oggBytes.Length)
|
||
{
|
||
var page = oggBytes.Slice(offset);
|
||
if (!page[..4].SequenceEqual(OggOpusConstants.CapturePattern))
|
||
{
|
||
// Not on a page boundary — the encoder writes contiguous pages, so this means the
|
||
// stream is malformed or we mis-stepped. Either way it is unrecoverable here.
|
||
return null;
|
||
}
|
||
|
||
var segmentCount = page[OggOpusConstants.PageSegmentCountOffset];
|
||
var segmentTableEnd = OggOpusConstants.OggPageHeaderSize + segmentCount;
|
||
if (segmentTableEnd > page.Length)
|
||
return null; // truncated header
|
||
|
||
var payloadSize = 0;
|
||
for (var i = 0; i < segmentCount; i++)
|
||
payloadSize += page[OggOpusConstants.OggPageHeaderSize + i];
|
||
|
||
var pageTotalSize = segmentTableEnd + payloadSize;
|
||
if (pageTotalSize > page.Length)
|
||
return null; // truncated payload
|
||
|
||
var payload = page.Slice(segmentTableEnd, payloadSize);
|
||
var granule = BinaryPrimitives.ReadUInt64LittleEndian(
|
||
page.Slice(OggOpusConstants.GranulePositionOffset, 8));
|
||
|
||
state.AddPage(granule, payload, page.Slice(0, pageTotalSize), offset);
|
||
|
||
offset += pageTotalSize;
|
||
}
|
||
|
||
return state.Finish((ulong)oggBytes.Length);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Streaming counterpart of <see cref="Walk(ReadOnlySpan{byte})"/>: walks the Ogg page structure from
|
||
/// a forward <paramref name="stream"/> in a bounded read buffer (one Ogg page at a time), producing a
|
||
/// byte-identical <see cref="OggOpusWalk"/> without ever holding the whole encoded file in memory.
|
||
/// Returns null on the same malformed/truncated conditions as the buffer overload — it does not throw
|
||
/// for bad input (only <see cref="OperationCanceledException"/> propagates on cancellation).
|
||
/// </summary>
|
||
public static Task<OggOpusWalk?> WalkAsync(Stream stream, CancellationToken cancellationToken = default)
|
||
=> WalkAsync(stream, MaxOggPageSize, cancellationToken);
|
||
|
||
/// <summary>
|
||
/// Buffer-size-parameterised core. <paramref name="bufferSize"/> is floored at
|
||
/// <see cref="MaxOggPageSize"/> so any single page always fits in the buffer; the absolute byte
|
||
/// cursor (<c>absoluteOffset</c>) advances as the window compacts, so recorded seek offsets stay
|
||
/// absolute even though the buffer holds only a small window at any instant.
|
||
/// </summary>
|
||
internal static async Task<OggOpusWalk?> WalkAsync(Stream stream, int bufferSize, CancellationToken cancellationToken)
|
||
{
|
||
var state = new WalkState();
|
||
var buffer = new byte[Math.Max(bufferSize, MaxOggPageSize)];
|
||
var len = 0; // valid bytes held at buffer[0..len]
|
||
long absoluteOffset = 0; // absolute stream position of buffer[0]
|
||
|
||
while (true)
|
||
{
|
||
// The buffer overload's loop guard requires a full fixed header before parsing a page; once
|
||
// fewer than that remain (and the stream is drained) it is the natural end of the stream.
|
||
if (len < OggOpusConstants.OggPageHeaderSize)
|
||
{
|
||
len += await FillAsync(stream, buffer, len, cancellationToken);
|
||
if (len < OggOpusConstants.OggPageHeaderSize)
|
||
break;
|
||
}
|
||
|
||
if (!buffer.AsSpan(0, 4).SequenceEqual(OggOpusConstants.CapturePattern))
|
||
return null;
|
||
|
||
var segmentCount = buffer[OggOpusConstants.PageSegmentCountOffset];
|
||
var segmentTableEnd = OggOpusConstants.OggPageHeaderSize + segmentCount;
|
||
if (len < segmentTableEnd)
|
||
{
|
||
len += await FillAsync(stream, buffer, len, cancellationToken);
|
||
if (len < segmentTableEnd)
|
||
return null; // truncated header
|
||
}
|
||
|
||
var payloadSize = 0;
|
||
for (var i = 0; i < segmentCount; i++)
|
||
payloadSize += buffer[OggOpusConstants.OggPageHeaderSize + i];
|
||
|
||
var pageTotalSize = segmentTableEnd + payloadSize;
|
||
if (len < pageTotalSize)
|
||
{
|
||
len += await FillAsync(stream, buffer, len, cancellationToken);
|
||
if (len < pageTotalSize)
|
||
return null; // truncated payload (page never fully arrived before EOF)
|
||
}
|
||
|
||
var page = buffer.AsSpan(0, pageTotalSize);
|
||
var granule = BinaryPrimitives.ReadUInt64LittleEndian(
|
||
page.Slice(OggOpusConstants.GranulePositionOffset, 8));
|
||
var payload = page.Slice(segmentTableEnd, payloadSize);
|
||
|
||
state.AddPage(granule, payload, page, absoluteOffset);
|
||
|
||
// Compact the consumed page off the front of the window; the absolute cursor advances by the
|
||
// exact page size so every offset the state records remains an absolute stream position.
|
||
var remaining = len - pageTotalSize;
|
||
if (remaining > 0)
|
||
Buffer.BlockCopy(buffer, pageTotalSize, buffer, 0, remaining);
|
||
len = remaining;
|
||
absoluteOffset += pageTotalSize;
|
||
}
|
||
|
||
return state.Finish((ulong)absoluteOffset + (ulong)len);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Fills <paramref name="buffer"/> from <paramref name="offset"/> to its end, issuing as many reads as
|
||
/// needed until the buffer is full or the stream is exhausted. Returns the number of bytes added.
|
||
/// </summary>
|
||
private static async Task<int> FillAsync(Stream stream, byte[] buffer, int offset, CancellationToken ct)
|
||
{
|
||
var added = 0;
|
||
while (offset + added < buffer.Length)
|
||
{
|
||
var read = await stream.ReadAsync(buffer.AsMemory(offset + added, buffer.Length - offset - added), ct);
|
||
if (read == 0)
|
||
break;
|
||
added += read;
|
||
}
|
||
return added;
|
||
}
|
||
|
||
private static bool StartsWith(ReadOnlySpan<byte> payload, ReadOnlySpan<byte> signature) =>
|
||
payload.Length >= signature.Length && payload[..signature.Length].SequenceEqual(signature);
|
||
|
||
/// <summary>
|
||
/// The single page-processing core both <see cref="Walk(ReadOnlySpan{byte})"/> and
|
||
/// <see cref="WalkAsync(Stream,int,CancellationToken)"/> drive, page by page, in stream order. Holding
|
||
/// the setup-header + seek-index accumulation here is what makes the two entry points byte-identical
|
||
/// by construction: there is exactly one copy of the OpusHead/OpusTags detection, pre-skip correction,
|
||
/// t=0 anchoring, and 0.5 s bucketing logic.
|
||
/// </summary>
|
||
private sealed class WalkState
|
||
{
|
||
// The real setup (OpusHead + OpusTags pages) is a few KB; this cap bounds the streaming capture so
|
||
// a malformed head-without-tags stream cannot grow it unboundedly. A stream that exceeds it has no
|
||
// OpusTags within the cap, so no audio points are ever recorded and Finish returns null either way
|
||
// — the cap never changes the output of a stream that produces a non-null result.
|
||
private const int MaxSetupHeaderBytes = 8 * 1024 * 1024;
|
||
|
||
private bool _sawOpusHead;
|
||
private bool _sawOpusTags;
|
||
private ushort _preSkip;
|
||
private int _setupHeaderEnd = -1;
|
||
|
||
private bool _capturingSetup = true;
|
||
private readonly List<byte> _setupHeader = new();
|
||
|
||
private readonly List<OpusSeekPoint> _points = new();
|
||
private ulong _lastGranule;
|
||
private double _nextBucketTime;
|
||
private bool _firstAudioPointTaken;
|
||
|
||
/// <summary>
|
||
/// Processes one fully-framed page. <paramref name="pageBytes"/> is the whole page (header +
|
||
/// segment table + payload) for verbatim setup capture; <paramref name="absoluteOffset"/> is the
|
||
/// page's absolute start in the stream — the value recorded in the seek index.
|
||
/// </summary>
|
||
public void AddPage(ulong granule, ReadOnlySpan<byte> payload, ReadOnlySpan<byte> pageBytes, long absoluteOffset)
|
||
{
|
||
if (_capturingSetup)
|
||
{
|
||
if (_setupHeader.Count + pageBytes.Length > MaxSetupHeaderBytes)
|
||
_capturingSetup = false; // malformed: give up capture (result will be null without tags)
|
||
else
|
||
_setupHeader.AddRange(pageBytes);
|
||
}
|
||
|
||
// The setup pages carry no audio granule (OpusHead has granulepos 0; OpusTags too). They
|
||
// are the leading pages whose payload opens with the Opus magic signatures.
|
||
if (!_sawOpusHead && StartsWith(payload, OggOpusConstants.OpusHeadSignature))
|
||
{
|
||
_sawOpusHead = true;
|
||
_setupHeaderEnd = (int)(absoluteOffset + pageBytes.Length);
|
||
|
||
// RFC 7845 §5.1 — OpusHead layout after the 8-byte "OpusHead" magic:
|
||
// [0] version (1 byte), [1] channel count (1 byte),
|
||
// [2-3] pre_skip (little-endian uint16) ← at packet bytes 10-11
|
||
// pre_skip is the number of decoder samples to discard before presenting audio;
|
||
// all granule→time conversions must subtract it (RFC 7845 §4.3).
|
||
if (payload.Length >= OggOpusConstants.OpusHeadMinSize)
|
||
_preSkip = BinaryPrimitives.ReadUInt16LittleEndian(
|
||
payload.Slice(OggOpusConstants.OpusHeadPreSkipOffset, 2));
|
||
}
|
||
else if (_sawOpusHead && !_sawOpusTags && StartsWith(payload, OggOpusConstants.OpusTagsSignature))
|
||
{
|
||
_sawOpusTags = true;
|
||
_setupHeaderEnd = (int)(absoluteOffset + pageBytes.Length);
|
||
// The setup header ends at the OpusTags page; stop capturing so audio pages never grow it.
|
||
_capturingSetup = false;
|
||
}
|
||
else if (_sawOpusHead && _sawOpusTags)
|
||
{
|
||
// Audio page. Record the first audio page unconditionally (the seek anchor at t=0),
|
||
// then one entry per 0.5 s bucket. A page with no end-granule (mid-packet continuation,
|
||
// granulepos == -1) is skipped for indexing — its time is unknown — but still advances
|
||
// the byte cursor.
|
||
if (granule != OggOpusConstants.NoGranulePosition)
|
||
{
|
||
// RFC 7845 §4.3: presentation time = max(0, granule − preSkip) / 48000.
|
||
// Use this corrected time for bucketing so that a stream with pre-skip 3840 (~80 ms)
|
||
// does not systematically offset every indexed time by that amount.
|
||
var correctedTime = Math.Max(0.0,
|
||
(granule - (double)_preSkip) / OggOpusConstants.OpusSampleRate);
|
||
|
||
if (!_firstAudioPointTaken)
|
||
{
|
||
// Anchor the first seek point at corrected time = 0 by storing the granule as
|
||
// preSkip. This guarantees that a binary search for t=0 ("largest entry with
|
||
// corrected time ≤ 0") always resolves to the first audio page's byte offset —
|
||
// even when the real granule is slightly above preSkip due to encoder lead-in.
|
||
_points.Add(new OpusSeekPoint(_preSkip, (ulong)absoluteOffset));
|
||
_firstAudioPointTaken = true;
|
||
_nextBucketTime = OggOpusConstants.SeekBucketSeconds;
|
||
}
|
||
else if (correctedTime >= _nextBucketTime)
|
||
{
|
||
_points.Add(new OpusSeekPoint(granule, (ulong)absoluteOffset));
|
||
// Advance past every bucket this page crossed so a long page does not emit a
|
||
// backlog of entries; the next bucket is the first boundary strictly after it.
|
||
while (_nextBucketTime <= correctedTime)
|
||
_nextBucketTime += OggOpusConstants.SeekBucketSeconds;
|
||
}
|
||
|
||
_lastGranule = granule;
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Produces the final walk, or null on the same conditions the buffer overload rejected:
|
||
/// no OpusHead, no captured setup header, or no audio seek points. <paramref name="totalByteLength"/>
|
||
/// is the full stream length, recorded for end-of-stream seek clamping.
|
||
/// </summary>
|
||
public OggOpusWalk? Finish(ulong totalByteLength)
|
||
{
|
||
if (!_sawOpusHead || _setupHeaderEnd < 0 || _points.Count == 0)
|
||
return null;
|
||
|
||
var setupHeader = _setupHeader.ToArray();
|
||
// RFC 7845 §4.3: total duration is also pre-skip-corrected, matching the time a listener
|
||
// experiences (the last audio page's corrected time, clamped to ≥ 0).
|
||
var totalDuration = Math.Max(0.0,
|
||
(_lastGranule - (double)_preSkip) / OggOpusConstants.OpusSampleRate);
|
||
var index = new OggOpusSeekIndex(_points, totalDuration, totalByteLength, _preSkip);
|
||
return new OggOpusWalk(setupHeader, index);
|
||
}
|
||
}
|
||
}
|