Files
deepdrft/DeepDrftTests/OggOpusStreamingWalkTests.cs
T
daniel-c-harvey 4351ae04be Stream Opus transcode source and encoded output; removes last store-path OOM
Source read via streamed vault open + bounded staging copy (index-only duration/extension); encoded output walked from a bounded stream (new OggOpusParser.WalkAsync, byte-identical to the buffer oracle) and stored via streaming vault write. Adds parity tests.
2026-06-26 14:06:33 -04:00

290 lines
13 KiB
C#

using System.Buffers.Binary;
using System.Text;
using DeepDrftContent.Processors.Opus;
namespace DeepDrftTests;
/// <summary>
/// Parity coverage for the streaming Ogg-Opus walk (<see cref="OggOpusParser.WalkAsync(System.IO.Stream,System.Threading.CancellationToken)"/>),
/// the OOM-fix counterpart of the whole-buffer <see cref="OggOpusParser.Walk(System.ReadOnlySpan{byte})"/>.
/// The buffer overload is the byte-identity parity oracle: the streaming walk must produce an identical
/// <see cref="OggOpusWalk"/> (setup header, every seek point, clamp totals, pre-skip) without ever holding
/// the whole encoded file in memory. The Ogg construction helpers mirror <see cref="OggOpusParserTests"/>.
/// </summary>
[TestFixture]
public class OggOpusStreamingWalkTests
{
private const ushort TestPreSkip = 312;
// Each fixture is a hand-built Ogg Opus stream exercising a different facet the buffer oracle asserts:
// setup capture, t=0 anchoring with pre-skip, 0.5 s bucketing, and end-of-stream clamps.
private static IEnumerable<TestCaseData> ParityFixtures()
{
yield return new TestCaseData(
Concat(OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()), OggPage(48000, AudioPacket(64))))
.SetName("Setup plus single audio page");
yield return new TestCaseData(BuildPreSkipStream())
.SetName("Non-zero pre-skip across three audio pages");
yield return new TestCaseData(BuildBucketingStream(pageCount: 20, granuleStep: 12000, payload: 40))
.SetName("Twenty quarter-second pages coalesced to half-second buckets");
yield return new TestCaseData(BuildBucketingStream(pageCount: 12, granuleStep: 24000, payload: 30))
.SetName("Twelve half-second pages one entry each");
}
[TestCaseSource(nameof(ParityFixtures))]
public async Task WalkAsync_IsByteIdenticalToBufferWalk(byte[] stream)
{
var expected = OggOpusParser.Walk(stream);
var actual = await OggOpusParser.WalkAsync(new MemoryStream(stream));
AssertWalksEqual(expected, actual);
Assert.That(actual, Is.Not.Null, "These fixtures are all well-formed and must walk");
}
[TestCaseSource(nameof(ParityFixtures))]
public async Task WalkAsync_IsByteIdenticalToBufferWalk_ViaTrickleStream(byte[] stream)
{
// TrickleStream returns at most 1 byte per ReadAsync, so FillAsync's inner assembly loop
// must iterate many times to accumulate each page header, segment table, and payload.
// This exercises the partial-read path that MemoryStream never hits.
var expected = OggOpusParser.Walk(stream);
var actual = await OggOpusParser.WalkAsync(new TrickleStream(stream));
AssertWalksEqual(expected, actual);
Assert.That(actual, Is.Not.Null, "These fixtures are all well-formed and must walk via trickle");
}
[Test]
public async Task WalkAsync_RecordsCorrectAbsoluteOffsets_WhenPageRunSpansMultipleBufferWindows()
{
// A stream several times larger than the streaming read buffer (one Ogg page max, ~64 KB), so the
// buffer window compacts and refills repeatedly. The absolute byte cursor must keep recorded seek
// offsets correct across every window advance — proven by byte-identity with the buffer oracle,
// which sees the whole stream at once.
var pages = new List<byte[]> { OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()) };
ulong granule = 0;
var total = pages.Sum(p => p.Length);
while (total < 300_000)
{
granule += 24000; // 0.5 s — one index entry per page
var page = OggPage(granule, AudioPacket(20_000)); // large pages so several fill one window
pages.Add(page);
total += page.Length;
}
var stream = Concat(pages.ToArray());
var expected = OggOpusParser.Walk(stream);
var actual = await OggOpusParser.WalkAsync(new MemoryStream(stream));
AssertWalksEqual(expected, actual);
Assert.That(actual, Is.Not.Null);
Assert.Multiple(() =>
{
// Sanity: the fixture really does span multiple windows and the offsets advanced well past one.
Assert.That(stream.Length, Is.GreaterThan(200_000));
Assert.That(actual!.SeekIndex.Points.Count, Is.GreaterThan(5));
Assert.That(actual.SeekIndex.Points[^1].ByteOffset, Is.GreaterThan(100_000UL),
"The last seek offset must be an absolute position deep in the stream, not a window-relative one");
});
}
[Test]
public async Task WalkAsync_NotAnOggStream_ReturnsNull_MatchingBufferOverload()
{
var notOgg = Encoding.ASCII.GetBytes("this is not an ogg stream at all");
Assert.That(OggOpusParser.Walk(notOgg), Is.Null, "oracle precondition");
Assert.That(await OggOpusParser.WalkAsync(new MemoryStream(notOgg)), Is.Null);
}
[Test]
public async Task WalkAsync_SetupWithoutAudioPages_ReturnsNull_MatchingBufferOverload()
{
var headOnly = Concat(OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()));
Assert.That(OggOpusParser.Walk(headOnly), Is.Null, "oracle precondition");
Assert.That(await OggOpusParser.WalkAsync(new MemoryStream(headOnly)), Is.Null);
}
[Test]
public async Task WalkAsync_TruncatedPayload_ReturnsNull_MatchingBufferOverload()
{
// A well-formed stream whose final audio page is cut short within its payload: the page header and
// segment table are intact (so the declared page size is read), but the payload never fully arrives.
var full = Concat(
OggPage(0, OpusHeadPacket()),
OggPage(0, OpusTagsPacket()),
OggPage(48000, AudioPacket(64)),
OggPage(96000, AudioPacket(64)));
var truncated = full[..^5]; // drop the tail of the last page's payload
Assert.That(OggOpusParser.Walk(truncated), Is.Null, "oracle precondition: truncated payload is unrecoverable");
Assert.That(await OggOpusParser.WalkAsync(new MemoryStream(truncated)), Is.Null);
}
// ---- Assertions -------------------------------------------------------------------------
private static void AssertWalksEqual(OggOpusWalk? expected, OggOpusWalk? actual)
{
if (expected is null)
{
Assert.That(actual, Is.Null, "Streaming walk must be null wherever the buffer oracle is null");
return;
}
Assert.That(actual, Is.Not.Null, "Streaming walk must be non-null wherever the buffer oracle is non-null");
Assert.Multiple(() =>
{
Assert.That(actual!.SetupHeaderBytes, Is.EqualTo(expected.SetupHeaderBytes),
"Setup header bytes must be byte-identical");
Assert.That(actual.SeekIndex.Points, Is.EqualTo(expected.SeekIndex.Points),
"Every seek point (granule + absolute byte offset) must match");
Assert.That(actual.SeekIndex.TotalDurationSeconds, Is.EqualTo(expected.SeekIndex.TotalDurationSeconds),
"Pre-skip-corrected total duration must match");
Assert.That(actual.SeekIndex.TotalByteLength, Is.EqualTo(expected.SeekIndex.TotalByteLength),
"Total byte length (end-of-stream clamp) must match");
Assert.That(actual.SeekIndex.PreSkip, Is.EqualTo(expected.SeekIndex.PreSkip),
"Pre-skip must match");
});
}
// ---- Ogg stream construction helpers (mirrors OggOpusParserTests) -----------------------
private static byte[] BuildPreSkipStream()
{
var head = OggPage(0, OpusHeadPacket(preSkip: TestPreSkip));
var tags = OggPage(0, OpusTagsPacket());
var a1 = OggPage(48000, AudioPacket(64));
var a2 = OggPage(96000, AudioPacket(64));
var a3 = OggPage(144000, AudioPacket(64));
return Concat(head, tags, a1, a2, a3);
}
private static byte[] BuildBucketingStream(int pageCount, ulong granuleStep, int payload)
{
var pages = new List<byte[]> { OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()) };
ulong granule = 0;
for (var i = 0; i < pageCount; i++)
{
granule += granuleStep;
pages.Add(OggPage(granule, AudioPacket(payload + i)));
}
return Concat(pages.ToArray());
}
private static byte[] OggPage(ulong granule, byte[] packet)
{
var segments = new List<byte>();
var remaining = packet.Length;
while (remaining >= 255)
{
segments.Add(255);
remaining -= 255;
}
segments.Add((byte)remaining);
var header = new byte[OggOpusConstants.OggPageHeaderSize + segments.Count];
OggOpusConstants.CapturePattern.CopyTo(header);
header[4] = 0; // version
header[5] = 0; // header-type flags
BinaryPrimitives.WriteUInt64LittleEndian(header.AsSpan(OggOpusConstants.GranulePositionOffset, 8), granule);
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(14, 4), 0xDEAD); // serial
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(18, 4), 0); // sequence (unused by parser)
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(22, 4), 0); // checksum (unverified)
header[OggOpusConstants.PageSegmentCountOffset] = (byte)segments.Count;
for (var i = 0; i < segments.Count; i++)
header[OggOpusConstants.OggPageHeaderSize + i] = segments[i];
return Concat(header, packet);
}
private static byte[] OpusHeadPacket(ushort preSkip = 0)
{
var tail = new byte[11];
tail[0] = 1; // version
tail[1] = 2; // channels
BinaryPrimitives.WriteUInt16LittleEndian(tail.AsSpan(2, 2), preSkip); // pre_skip
BinaryPrimitives.WriteUInt32LittleEndian(tail.AsSpan(4, 4), 48000); // input sample rate
tail[10] = 0; // channel mapping family
return Concat(OggOpusConstants.OpusHeadSignature.ToArray(), tail);
}
private static byte[] OpusTagsPacket()
{
var vendor = Encoding.ASCII.GetBytes("test");
var packet = new List<byte>();
packet.AddRange(OggOpusConstants.OpusTagsSignature.ToArray());
packet.AddRange(BitConverter.GetBytes((uint)vendor.Length));
packet.AddRange(vendor);
packet.AddRange(BitConverter.GetBytes(0u)); // user comment count
return packet.ToArray();
}
private static byte[] AudioPacket(int size)
{
var packet = new byte[size];
for (var i = 0; i < size; i++)
packet[i] = (byte)(i & 0xFF);
return packet;
}
private static byte[] Concat(params byte[][] parts)
{
var total = parts.Sum(p => p.Length);
var result = new byte[total];
var cursor = 0;
foreach (var part in parts)
{
part.CopyTo(result, cursor);
cursor += part.Length;
}
return result;
}
// ---- TrickleStream -----------------------------------------------------------------------
/// <summary>
/// A non-seekable, read-only stream wrapper that returns at most 1 byte per
/// <see cref="ReadAsync(Memory{byte}, CancellationToken)"/> call. This drives every
/// <c>FillAsync</c> invocation through its inner assembly loop, exercising the partial-read
/// path that <see cref="MemoryStream"/> never hits because it satisfies requests in one shot.
/// </summary>
private sealed class TrickleStream(byte[] data) : Stream
{
private int _position;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0 || _position >= data.Length) return 0;
buffer[offset] = data[_position++];
return 1;
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (buffer.IsEmpty || _position >= data.Length) return new ValueTask<int>(0);
buffer.Span[0] = data[_position++];
return new ValueTask<int>(1);
}
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}