From 4351ae04be5dba38fff941d581af668aba227957 Mon Sep 17 00:00:00 2001 From: daniel-c-harvey Date: Fri, 26 Jun 2026 14:06:33 -0400 Subject: [PATCH] Stream Opus transcode source and encoded output; removes last store-path OOM Source read via streamed vault open + bounded staging copy (index-only duration/extension); encoded output walked from a bounded stream (new OggOpusParser.WalkAsync, byte-identical to the buffer oracle) and stored via streaming vault write. Adds parity tests. --- .../Processors/Opus/OggOpusParser.cs | 246 ++++++++++++--- .../Processors/Opus/OpusTranscodeService.cs | 81 +++-- DeepDrftContent/TrackContentService.cs | 20 ++ DeepDrftTests/OggOpusStreamingWalkTests.cs | 289 ++++++++++++++++++ 4 files changed, 572 insertions(+), 64 deletions(-) create mode 100644 DeepDrftTests/OggOpusStreamingWalkTests.cs diff --git a/DeepDrftContent/Processors/Opus/OggOpusParser.cs b/DeepDrftContent/Processors/Opus/OggOpusParser.cs index 0e4f850..5d0b28b 100644 --- a/DeepDrftContent/Processors/Opus/OggOpusParser.cs +++ b/DeepDrftContent/Processors/Opus/OggOpusParser.cs @@ -20,8 +20,27 @@ public sealed record OggOpusWalk(byte[] SetupHeaderBytes, OggOpusSeekIndex SeekI /// to the nearest enclosing page start. No external dependency: the encoder (FFmpeg) produces the bytes; /// this turns them into the seek artifact deterministically, so it is unit-testable without a codec. /// +/// +/// Two entry points share one page-processing core, so they produce byte-identical +/// output by construction (the project's parity-oracle convention, mirroring +/// RmsLoudnessAlgorithm.Compute over its accumulator): +/// +/// — the whole-buffer overload, retained as the byte-identity +/// parity oracle for the streaming variant. +/// — the streaming +/// variant: walks the page structure from a forward stream in a bounded read buffer (one Ogg page at a +/// time), so peak managed memory is O(buffer + seek-index + setup-header), independent of file size. +/// +/// public static class OggOpusParser { + /// + /// The largest a single Ogg page can be: header(27) + a full 255-entry segment table + the maximum + /// payload those segments can describe (255 × 255 bytes). The streaming read buffer is floored at this + /// so a complete page always fits, which means a short read on a page can only mean a truncated stream. + /// + private const int MaxOggPageSize = OggOpusConstants.OggPageHeaderSize + 255 + 255 * 255; // 65307 + /// /// Walks and produces the setup header + seek index, or null if the /// bytes are not a recognisable Ogg Opus stream (no setup header, no audio pages, or truncated @@ -30,15 +49,7 @@ public static class OggOpusParser /// public static OggOpusWalk? Walk(ReadOnlySpan oggBytes) { - var setupHeaderEnd = -1; - var sawOpusHead = false; - var sawOpusTags = false; - ushort preSkip = 0; - - var points = new List(); - ulong lastGranule = 0; - var nextBucketTime = 0.0; - var firstAudioPointTaken = false; + var state = new WalkState(); var offset = 0; while (offset + OggOpusConstants.OggPageHeaderSize <= oggBytes.Length) @@ -68,12 +79,160 @@ public static class OggOpusParser var granule = BinaryPrimitives.ReadUInt64LittleEndian( page.Slice(OggOpusConstants.GranulePositionOffset, 8)); + state.AddPage(granule, payload, page.Slice(0, pageTotalSize), offset); + + offset += pageTotalSize; + } + + return state.Finish((ulong)oggBytes.Length); + } + + /// + /// Streaming counterpart of : walks the Ogg page structure from + /// a forward in a bounded read buffer (one Ogg page at a time), producing a + /// byte-identical without ever holding the whole encoded file in memory. + /// Returns null on the same malformed/truncated conditions as the buffer overload — it does not throw + /// for bad input (only propagates on cancellation). + /// + public static Task WalkAsync(Stream stream, CancellationToken cancellationToken = default) + => WalkAsync(stream, MaxOggPageSize, cancellationToken); + + /// + /// Buffer-size-parameterised core. is floored at + /// so any single page always fits in the buffer; the absolute byte + /// cursor (absoluteOffset) advances as the window compacts, so recorded seek offsets stay + /// absolute even though the buffer holds only a small window at any instant. + /// + internal static async Task WalkAsync(Stream stream, int bufferSize, CancellationToken cancellationToken) + { + var state = new WalkState(); + var buffer = new byte[Math.Max(bufferSize, MaxOggPageSize)]; + var len = 0; // valid bytes held at buffer[0..len] + long absoluteOffset = 0; // absolute stream position of buffer[0] + + while (true) + { + // The buffer overload's loop guard requires a full fixed header before parsing a page; once + // fewer than that remain (and the stream is drained) it is the natural end of the stream. + if (len < OggOpusConstants.OggPageHeaderSize) + { + len += await FillAsync(stream, buffer, len, cancellationToken); + if (len < OggOpusConstants.OggPageHeaderSize) + break; + } + + if (!buffer.AsSpan(0, 4).SequenceEqual(OggOpusConstants.CapturePattern)) + return null; + + var segmentCount = buffer[OggOpusConstants.PageSegmentCountOffset]; + var segmentTableEnd = OggOpusConstants.OggPageHeaderSize + segmentCount; + if (len < segmentTableEnd) + { + len += await FillAsync(stream, buffer, len, cancellationToken); + if (len < segmentTableEnd) + return null; // truncated header + } + + var payloadSize = 0; + for (var i = 0; i < segmentCount; i++) + payloadSize += buffer[OggOpusConstants.OggPageHeaderSize + i]; + + var pageTotalSize = segmentTableEnd + payloadSize; + if (len < pageTotalSize) + { + len += await FillAsync(stream, buffer, len, cancellationToken); + if (len < pageTotalSize) + return null; // truncated payload (page never fully arrived before EOF) + } + + var page = buffer.AsSpan(0, pageTotalSize); + var granule = BinaryPrimitives.ReadUInt64LittleEndian( + page.Slice(OggOpusConstants.GranulePositionOffset, 8)); + var payload = page.Slice(segmentTableEnd, payloadSize); + + state.AddPage(granule, payload, page, absoluteOffset); + + // Compact the consumed page off the front of the window; the absolute cursor advances by the + // exact page size so every offset the state records remains an absolute stream position. + var remaining = len - pageTotalSize; + if (remaining > 0) + Buffer.BlockCopy(buffer, pageTotalSize, buffer, 0, remaining); + len = remaining; + absoluteOffset += pageTotalSize; + } + + return state.Finish((ulong)absoluteOffset + (ulong)len); + } + + /// + /// Fills from to its end, issuing as many reads as + /// needed until the buffer is full or the stream is exhausted. Returns the number of bytes added. + /// + private static async Task FillAsync(Stream stream, byte[] buffer, int offset, CancellationToken ct) + { + var added = 0; + while (offset + added < buffer.Length) + { + var read = await stream.ReadAsync(buffer.AsMemory(offset + added, buffer.Length - offset - added), ct); + if (read == 0) + break; + added += read; + } + return added; + } + + private static bool StartsWith(ReadOnlySpan payload, ReadOnlySpan signature) => + payload.Length >= signature.Length && payload[..signature.Length].SequenceEqual(signature); + + /// + /// The single page-processing core both and + /// drive, page by page, in stream order. Holding + /// the setup-header + seek-index accumulation here is what makes the two entry points byte-identical + /// by construction: there is exactly one copy of the OpusHead/OpusTags detection, pre-skip correction, + /// t=0 anchoring, and 0.5 s bucketing logic. + /// + private sealed class WalkState + { + // The real setup (OpusHead + OpusTags pages) is a few KB; this cap bounds the streaming capture so + // a malformed head-without-tags stream cannot grow it unboundedly. A stream that exceeds it has no + // OpusTags within the cap, so no audio points are ever recorded and Finish returns null either way + // — the cap never changes the output of a stream that produces a non-null result. + private const int MaxSetupHeaderBytes = 8 * 1024 * 1024; + + private bool _sawOpusHead; + private bool _sawOpusTags; + private ushort _preSkip; + private int _setupHeaderEnd = -1; + + private bool _capturingSetup = true; + private readonly List _setupHeader = new(); + + private readonly List _points = new(); + private ulong _lastGranule; + private double _nextBucketTime; + private bool _firstAudioPointTaken; + + /// + /// Processes one fully-framed page. is the whole page (header + + /// segment table + payload) for verbatim setup capture; is the + /// page's absolute start in the stream — the value recorded in the seek index. + /// + public void AddPage(ulong granule, ReadOnlySpan payload, ReadOnlySpan pageBytes, long absoluteOffset) + { + if (_capturingSetup) + { + if (_setupHeader.Count + pageBytes.Length > MaxSetupHeaderBytes) + _capturingSetup = false; // malformed: give up capture (result will be null without tags) + else + _setupHeader.AddRange(pageBytes); + } + // The setup pages carry no audio granule (OpusHead has granulepos 0; OpusTags too). They // are the leading pages whose payload opens with the Opus magic signatures. - if (!sawOpusHead && StartsWith(payload, OggOpusConstants.OpusHeadSignature)) + if (!_sawOpusHead && StartsWith(payload, OggOpusConstants.OpusHeadSignature)) { - sawOpusHead = true; - setupHeaderEnd = offset + pageTotalSize; + _sawOpusHead = true; + _setupHeaderEnd = (int)(absoluteOffset + pageBytes.Length); // RFC 7845 §5.1 — OpusHead layout after the 8-byte "OpusHead" magic: // [0] version (1 byte), [1] channel count (1 byte), @@ -81,15 +240,17 @@ public static class OggOpusParser // pre_skip is the number of decoder samples to discard before presenting audio; // all granule→time conversions must subtract it (RFC 7845 §4.3). if (payload.Length >= OggOpusConstants.OpusHeadMinSize) - preSkip = BinaryPrimitives.ReadUInt16LittleEndian( + _preSkip = BinaryPrimitives.ReadUInt16LittleEndian( payload.Slice(OggOpusConstants.OpusHeadPreSkipOffset, 2)); } - else if (sawOpusHead && !sawOpusTags && StartsWith(payload, OggOpusConstants.OpusTagsSignature)) + else if (_sawOpusHead && !_sawOpusTags && StartsWith(payload, OggOpusConstants.OpusTagsSignature)) { - sawOpusTags = true; - setupHeaderEnd = offset + pageTotalSize; + _sawOpusTags = true; + _setupHeaderEnd = (int)(absoluteOffset + pageBytes.Length); + // The setup header ends at the OpusTags page; stop capturing so audio pages never grow it. + _capturingSetup = false; } - else if (sawOpusHead && sawOpusTags) + else if (_sawOpusHead && _sawOpusTags) { // Audio page. Record the first audio page unconditionally (the seek anchor at t=0), // then one entry per 0.5 s bucket. A page with no end-granule (mid-packet continuation, @@ -101,46 +262,49 @@ public static class OggOpusParser // Use this corrected time for bucketing so that a stream with pre-skip 3840 (~80 ms) // does not systematically offset every indexed time by that amount. var correctedTime = Math.Max(0.0, - (granule - (double)preSkip) / OggOpusConstants.OpusSampleRate); + (granule - (double)_preSkip) / OggOpusConstants.OpusSampleRate); - if (!firstAudioPointTaken) + if (!_firstAudioPointTaken) { // Anchor the first seek point at corrected time = 0 by storing the granule as // preSkip. This guarantees that a binary search for t=0 ("largest entry with // corrected time ≤ 0") always resolves to the first audio page's byte offset — // even when the real granule is slightly above preSkip due to encoder lead-in. - points.Add(new OpusSeekPoint(preSkip, (ulong)offset)); - firstAudioPointTaken = true; - nextBucketTime = OggOpusConstants.SeekBucketSeconds; + _points.Add(new OpusSeekPoint(_preSkip, (ulong)absoluteOffset)); + _firstAudioPointTaken = true; + _nextBucketTime = OggOpusConstants.SeekBucketSeconds; } - else if (correctedTime >= nextBucketTime) + else if (correctedTime >= _nextBucketTime) { - points.Add(new OpusSeekPoint(granule, (ulong)offset)); + _points.Add(new OpusSeekPoint(granule, (ulong)absoluteOffset)); // Advance past every bucket this page crossed so a long page does not emit a // backlog of entries; the next bucket is the first boundary strictly after it. - while (nextBucketTime <= correctedTime) - nextBucketTime += OggOpusConstants.SeekBucketSeconds; + while (_nextBucketTime <= correctedTime) + _nextBucketTime += OggOpusConstants.SeekBucketSeconds; } - lastGranule = granule; + _lastGranule = granule; } } - - offset += pageTotalSize; } - if (!sawOpusHead || setupHeaderEnd < 0 || points.Count == 0) - return null; + /// + /// Produces the final walk, or null on the same conditions the buffer overload rejected: + /// no OpusHead, no captured setup header, or no audio seek points. + /// is the full stream length, recorded for end-of-stream seek clamping. + /// + public OggOpusWalk? Finish(ulong totalByteLength) + { + if (!_sawOpusHead || _setupHeaderEnd < 0 || _points.Count == 0) + return null; - var setupHeader = oggBytes[..setupHeaderEnd].ToArray(); - // RFC 7845 §4.3: total duration is also pre-skip-corrected, matching the time a listener - // experiences (the last audio page's corrected time, clamped to ≥ 0). - var totalDuration = Math.Max(0.0, - (lastGranule - (double)preSkip) / OggOpusConstants.OpusSampleRate); - var index = new OggOpusSeekIndex(points, totalDuration, (ulong)oggBytes.Length, preSkip); - return new OggOpusWalk(setupHeader, index); + var setupHeader = _setupHeader.ToArray(); + // RFC 7845 §4.3: total duration is also pre-skip-corrected, matching the time a listener + // experiences (the last audio page's corrected time, clamped to ≥ 0). + var totalDuration = Math.Max(0.0, + (_lastGranule - (double)_preSkip) / OggOpusConstants.OpusSampleRate); + var index = new OggOpusSeekIndex(_points, totalDuration, totalByteLength, _preSkip); + return new OggOpusWalk(setupHeader, index); + } } - - private static bool StartsWith(ReadOnlySpan payload, ReadOnlySpan signature) => - payload.Length >= signature.Length && payload[..signature.Length].SequenceEqual(signature); } diff --git a/DeepDrftContent/Processors/Opus/OpusTranscodeService.cs b/DeepDrftContent/Processors/Opus/OpusTranscodeService.cs index dba73e9..cf3b5cd 100644 --- a/DeepDrftContent/Processors/Opus/OpusTranscodeService.cs +++ b/DeepDrftContent/Processors/Opus/OpusTranscodeService.cs @@ -17,17 +17,20 @@ namespace DeepDrftContent.Processors.Opus; public sealed class OpusTranscodeService { private readonly FileDb _fileDatabase; + private readonly TrackContentService _trackContent; private readonly FfmpegOpusEncoder _encoder; private readonly OpusTranscodeOptions _options; private readonly ILogger _logger; public OpusTranscodeService( FileDb fileDatabase, + TrackContentService trackContent, FfmpegOpusEncoder encoder, IOptions options, ILogger logger) { _fileDatabase = fileDatabase; + _trackContent = trackContent; _encoder = encoder; _options = options.Value; _logger = logger; @@ -43,27 +46,52 @@ public sealed class OpusTranscodeService /// public async Task TranscodeAndStoreAsync(string entryKey, CancellationToken ct) { - var source = await _fileDatabase.LoadResourceAsync(VaultConstants.Tracks, entryKey); - if (source is null) + // Read the source extension + duration from the vault index (no body load) and open a streamed + // read over the source bytes — never the whole-buffer AudioBinary. A 92-min mix source is ~970 MB; + // buffering it (and the encoded output below) was the last unconverted store-path OOM violation. + var trackDuration = await _trackContent.GetAudioDurationAsync(entryKey) ?? 0.0; + var sourceMedia = await _trackContent.OpenAudioMediaStreamAsync(entryKey); + if (sourceMedia is null) { _logger.LogWarning("Opus transcode: no source audio in vault for {EntryKey}; skipping.", entryKey); return false; } - Directory.CreateDirectory(_options.StagingPath); - var sourcePath = Path.Combine(_options.StagingPath, $"opus-src-{Guid.NewGuid():N}{source.Extension}"); - var opusPath = Path.Combine(_options.StagingPath, $"opus-out-{Guid.NewGuid():N}{OggOpusConstants.OpusExtension}"); - + string? sourcePath = null; + string? opusPath = null; try { - await File.WriteAllBytesAsync(sourcePath, source.Buffer, ct); + // Stage the source to disk in bounded chunks so ffmpeg can read it by file path/extension. + // The inner finally disposes the source stream as soon as the copy is done — the read handle + // is not held across the (long) encode — and guarantees disposal even if staging setup throws. + try + { + Directory.CreateDirectory(_options.StagingPath); + sourcePath = Path.Combine(_options.StagingPath, $"opus-src-{Guid.NewGuid():N}{sourceMedia.Extension}"); + opusPath = Path.Combine(_options.StagingPath, $"opus-out-{Guid.NewGuid():N}{OggOpusConstants.OpusExtension}"); + + await using var staging = new FileStream( + sourcePath, FileMode.Create, FileAccess.Write, FileShare.None, + bufferSize: 81920, useAsync: true); + await sourceMedia.Stream.CopyToAsync(staging, bufferSize: 81920, ct); + } + finally + { + await sourceMedia.DisposeAsync(); + } if (!await _encoder.EncodeAsync(sourcePath, opusPath, ct)) return false; // encoder already logged the cause - var opusBytes = await File.ReadAllBytesAsync(opusPath, ct); - - var walk = OggOpusParser.Walk(opusBytes); + // Walk the encoded output from a streamed read in a bounded buffer (no whole-file load). The + // seek index and setup header are byte-identical to the buffer walk (parity-tested). + OggOpusWalk? walk; + await using (var opusIn = new FileStream( + opusPath, FileMode.Open, FileAccess.Read, FileShare.Read, + bufferSize: 81920, useAsync: true)) + { + walk = await OggOpusParser.WalkAsync(opusIn, ct); + } if (walk is null) { _logger.LogError( @@ -74,27 +102,32 @@ public sealed class OpusTranscodeService await EnsureVaultAsync(); - var opusBitrate = source.Duration > 0 - ? (int)(opusBytes.Length * 8 / source.Duration / 1000) + // Bitrate from the output file length + duration — both available without buffering the bytes. + var opusLength = new FileInfo(opusPath).Length; + var opusBitrate = trackDuration > 0 + ? (int)(opusLength * 8 / trackDuration / 1000) : _options.BitrateKbps; - var audioBinary = new AudioBinary(new AudioBinaryParams( - opusBytes, opusBytes.Length, OggOpusConstants.OpusExtension, source.Duration, opusBitrate)); - - var sidecar = new OpusSidecar(walk.SetupHeaderBytes, walk.SeekIndex).ToBytes(); - var sidecarBinary = new MediaBinary(new MediaBinaryParams( - sidecar, sidecar.Length, OggOpusConstants.SidecarExtension)); // Store the audio first, then the sidecar. If the sidecar write fails the Opus bytes are // present but unseekable — treat that as a failed derive (return false) so a backfill re-runs // it; do not leave a half-derived track that the delivery layer would treat as complete. - var audioStored = await _fileDatabase.RegisterResourceAsync( - VaultConstants.TrackOpus, OpusAudioKey(entryKey), audioBinary); + var audioMeta = MetaDataFactory.CreateAudioMetaData( + OpusAudioKey(entryKey), OggOpusConstants.OpusExtension, trackDuration, opusBitrate); + var stagedOpusPath = opusPath; + var audioStored = await _fileDatabase.RegisterResourceStreamingAsync( + VaultConstants.TrackOpus, OpusAudioKey(entryKey), audioMeta, + (destination, token) => AudioStoreStream.CopyFileAsync(stagedOpusPath, destination, token), ct); if (!audioStored) { _logger.LogError("Opus transcode: vault write of Opus audio failed for {EntryKey}.", entryKey); return false; } + // The sidecar is the setup header (a few KB) plus the seek index (~16 bytes per 0.5 s bucket); + // it is inherently bounded and already in managed memory, so the whole-buffer write is correct. + var sidecar = new OpusSidecar(walk.SetupHeaderBytes, walk.SeekIndex).ToBytes(); + var sidecarBinary = new MediaBinary(new MediaBinaryParams( + sidecar, sidecar.Length, OggOpusConstants.SidecarExtension)); var sidecarStored = await _fileDatabase.RegisterResourceAsync( VaultConstants.TrackOpus, OpusSidecarKey(entryKey), sidecarBinary); if (!sidecarStored) @@ -105,7 +138,7 @@ public sealed class OpusTranscodeService _logger.LogInformation( "Opus transcode complete for {EntryKey}: {OpusBytes} bytes, {Points} seek points, {Duration:F1}s.", - entryKey, opusBytes.Length, walk.SeekIndex.Points.Count, walk.SeekIndex.TotalDurationSeconds); + entryKey, opusLength, walk.SeekIndex.Points.Count, walk.SeekIndex.TotalDurationSeconds); return true; } catch (OperationCanceledException) when (ct.IsCancellationRequested) @@ -119,8 +152,10 @@ public sealed class OpusTranscodeService } finally { - TryDelete(sourcePath); - TryDelete(opusPath); + if (sourcePath is not null) + TryDelete(sourcePath); + if (opusPath is not null) + TryDelete(opusPath); } } diff --git a/DeepDrftContent/TrackContentService.cs b/DeepDrftContent/TrackContentService.cs index b7894d3..1c7ffea 100644 --- a/DeepDrftContent/TrackContentService.cs +++ b/DeepDrftContent/TrackContentService.cs @@ -220,6 +220,26 @@ public class TrackContentService return media?.Stream; } + /// + /// Opens a read-only stream over a track's vault audio together with its stored extension, or null if + /// the entry has no backing file. Same non-buffering contract as , + /// but keeps the the caller needs to name a staging file for a + /// format-detecting consumer (the Opus transcode reopens the source by extension for ffmpeg). The + /// caller owns the returned and must dispose it. Follows the FileDatabase + /// swallow-and-return-null contract. + /// + /// Track ID (EntryKey) + public async Task OpenAudioMediaStreamAsync(string trackId) + { + var vault = _fileDatabase.GetVault(VaultConstants.Tracks); + if (vault is null) + { + return null; + } + + return await vault.GetEntryStreamAsync(trackId); + } + /// /// Reads a track's stored audio duration from the vault index metadata WITHOUT loading the audio /// body — the cheap counterpart of GetAudioBinaryAsync(...).Duration. Returns null if the diff --git a/DeepDrftTests/OggOpusStreamingWalkTests.cs b/DeepDrftTests/OggOpusStreamingWalkTests.cs new file mode 100644 index 0000000..947ad2e --- /dev/null +++ b/DeepDrftTests/OggOpusStreamingWalkTests.cs @@ -0,0 +1,289 @@ +using System.Buffers.Binary; +using System.Text; +using DeepDrftContent.Processors.Opus; + +namespace DeepDrftTests; + +/// +/// Parity coverage for the streaming Ogg-Opus walk (), +/// the OOM-fix counterpart of the whole-buffer . +/// The buffer overload is the byte-identity parity oracle: the streaming walk must produce an identical +/// (setup header, every seek point, clamp totals, pre-skip) without ever holding +/// the whole encoded file in memory. The Ogg construction helpers mirror . +/// +[TestFixture] +public class OggOpusStreamingWalkTests +{ + private const ushort TestPreSkip = 312; + + // Each fixture is a hand-built Ogg Opus stream exercising a different facet the buffer oracle asserts: + // setup capture, t=0 anchoring with pre-skip, 0.5 s bucketing, and end-of-stream clamps. + private static IEnumerable ParityFixtures() + { + yield return new TestCaseData( + Concat(OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()), OggPage(48000, AudioPacket(64)))) + .SetName("Setup plus single audio page"); + + yield return new TestCaseData(BuildPreSkipStream()) + .SetName("Non-zero pre-skip across three audio pages"); + + yield return new TestCaseData(BuildBucketingStream(pageCount: 20, granuleStep: 12000, payload: 40)) + .SetName("Twenty quarter-second pages coalesced to half-second buckets"); + + yield return new TestCaseData(BuildBucketingStream(pageCount: 12, granuleStep: 24000, payload: 30)) + .SetName("Twelve half-second pages one entry each"); + } + + [TestCaseSource(nameof(ParityFixtures))] + public async Task WalkAsync_IsByteIdenticalToBufferWalk(byte[] stream) + { + var expected = OggOpusParser.Walk(stream); + var actual = await OggOpusParser.WalkAsync(new MemoryStream(stream)); + + AssertWalksEqual(expected, actual); + Assert.That(actual, Is.Not.Null, "These fixtures are all well-formed and must walk"); + } + + [TestCaseSource(nameof(ParityFixtures))] + public async Task WalkAsync_IsByteIdenticalToBufferWalk_ViaTrickleStream(byte[] stream) + { + // TrickleStream returns at most 1 byte per ReadAsync, so FillAsync's inner assembly loop + // must iterate many times to accumulate each page header, segment table, and payload. + // This exercises the partial-read path that MemoryStream never hits. + var expected = OggOpusParser.Walk(stream); + var actual = await OggOpusParser.WalkAsync(new TrickleStream(stream)); + + AssertWalksEqual(expected, actual); + Assert.That(actual, Is.Not.Null, "These fixtures are all well-formed and must walk via trickle"); + } + + [Test] + public async Task WalkAsync_RecordsCorrectAbsoluteOffsets_WhenPageRunSpansMultipleBufferWindows() + { + // A stream several times larger than the streaming read buffer (one Ogg page max, ~64 KB), so the + // buffer window compacts and refills repeatedly. The absolute byte cursor must keep recorded seek + // offsets correct across every window advance — proven by byte-identity with the buffer oracle, + // which sees the whole stream at once. + var pages = new List { OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()) }; + ulong granule = 0; + var total = pages.Sum(p => p.Length); + while (total < 300_000) + { + granule += 24000; // 0.5 s — one index entry per page + var page = OggPage(granule, AudioPacket(20_000)); // large pages so several fill one window + pages.Add(page); + total += page.Length; + } + + var stream = Concat(pages.ToArray()); + var expected = OggOpusParser.Walk(stream); + var actual = await OggOpusParser.WalkAsync(new MemoryStream(stream)); + + AssertWalksEqual(expected, actual); + Assert.That(actual, Is.Not.Null); + Assert.Multiple(() => + { + // Sanity: the fixture really does span multiple windows and the offsets advanced well past one. + Assert.That(stream.Length, Is.GreaterThan(200_000)); + Assert.That(actual!.SeekIndex.Points.Count, Is.GreaterThan(5)); + Assert.That(actual.SeekIndex.Points[^1].ByteOffset, Is.GreaterThan(100_000UL), + "The last seek offset must be an absolute position deep in the stream, not a window-relative one"); + }); + } + + [Test] + public async Task WalkAsync_NotAnOggStream_ReturnsNull_MatchingBufferOverload() + { + var notOgg = Encoding.ASCII.GetBytes("this is not an ogg stream at all"); + + Assert.That(OggOpusParser.Walk(notOgg), Is.Null, "oracle precondition"); + Assert.That(await OggOpusParser.WalkAsync(new MemoryStream(notOgg)), Is.Null); + } + + [Test] + public async Task WalkAsync_SetupWithoutAudioPages_ReturnsNull_MatchingBufferOverload() + { + var headOnly = Concat(OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket())); + + Assert.That(OggOpusParser.Walk(headOnly), Is.Null, "oracle precondition"); + Assert.That(await OggOpusParser.WalkAsync(new MemoryStream(headOnly)), Is.Null); + } + + [Test] + public async Task WalkAsync_TruncatedPayload_ReturnsNull_MatchingBufferOverload() + { + // A well-formed stream whose final audio page is cut short within its payload: the page header and + // segment table are intact (so the declared page size is read), but the payload never fully arrives. + var full = Concat( + OggPage(0, OpusHeadPacket()), + OggPage(0, OpusTagsPacket()), + OggPage(48000, AudioPacket(64)), + OggPage(96000, AudioPacket(64))); + var truncated = full[..^5]; // drop the tail of the last page's payload + + Assert.That(OggOpusParser.Walk(truncated), Is.Null, "oracle precondition: truncated payload is unrecoverable"); + Assert.That(await OggOpusParser.WalkAsync(new MemoryStream(truncated)), Is.Null); + } + + // ---- Assertions ------------------------------------------------------------------------- + + private static void AssertWalksEqual(OggOpusWalk? expected, OggOpusWalk? actual) + { + if (expected is null) + { + Assert.That(actual, Is.Null, "Streaming walk must be null wherever the buffer oracle is null"); + return; + } + + Assert.That(actual, Is.Not.Null, "Streaming walk must be non-null wherever the buffer oracle is non-null"); + Assert.Multiple(() => + { + Assert.That(actual!.SetupHeaderBytes, Is.EqualTo(expected.SetupHeaderBytes), + "Setup header bytes must be byte-identical"); + Assert.That(actual.SeekIndex.Points, Is.EqualTo(expected.SeekIndex.Points), + "Every seek point (granule + absolute byte offset) must match"); + Assert.That(actual.SeekIndex.TotalDurationSeconds, Is.EqualTo(expected.SeekIndex.TotalDurationSeconds), + "Pre-skip-corrected total duration must match"); + Assert.That(actual.SeekIndex.TotalByteLength, Is.EqualTo(expected.SeekIndex.TotalByteLength), + "Total byte length (end-of-stream clamp) must match"); + Assert.That(actual.SeekIndex.PreSkip, Is.EqualTo(expected.SeekIndex.PreSkip), + "Pre-skip must match"); + }); + } + + // ---- Ogg stream construction helpers (mirrors OggOpusParserTests) ----------------------- + + private static byte[] BuildPreSkipStream() + { + var head = OggPage(0, OpusHeadPacket(preSkip: TestPreSkip)); + var tags = OggPage(0, OpusTagsPacket()); + var a1 = OggPage(48000, AudioPacket(64)); + var a2 = OggPage(96000, AudioPacket(64)); + var a3 = OggPage(144000, AudioPacket(64)); + return Concat(head, tags, a1, a2, a3); + } + + private static byte[] BuildBucketingStream(int pageCount, ulong granuleStep, int payload) + { + var pages = new List { OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()) }; + ulong granule = 0; + for (var i = 0; i < pageCount; i++) + { + granule += granuleStep; + pages.Add(OggPage(granule, AudioPacket(payload + i))); + } + return Concat(pages.ToArray()); + } + + private static byte[] OggPage(ulong granule, byte[] packet) + { + var segments = new List(); + var remaining = packet.Length; + while (remaining >= 255) + { + segments.Add(255); + remaining -= 255; + } + segments.Add((byte)remaining); + + var header = new byte[OggOpusConstants.OggPageHeaderSize + segments.Count]; + OggOpusConstants.CapturePattern.CopyTo(header); + header[4] = 0; // version + header[5] = 0; // header-type flags + BinaryPrimitives.WriteUInt64LittleEndian(header.AsSpan(OggOpusConstants.GranulePositionOffset, 8), granule); + BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(14, 4), 0xDEAD); // serial + BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(18, 4), 0); // sequence (unused by parser) + BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(22, 4), 0); // checksum (unverified) + header[OggOpusConstants.PageSegmentCountOffset] = (byte)segments.Count; + for (var i = 0; i < segments.Count; i++) + header[OggOpusConstants.OggPageHeaderSize + i] = segments[i]; + + return Concat(header, packet); + } + + private static byte[] OpusHeadPacket(ushort preSkip = 0) + { + var tail = new byte[11]; + tail[0] = 1; // version + tail[1] = 2; // channels + BinaryPrimitives.WriteUInt16LittleEndian(tail.AsSpan(2, 2), preSkip); // pre_skip + BinaryPrimitives.WriteUInt32LittleEndian(tail.AsSpan(4, 4), 48000); // input sample rate + tail[10] = 0; // channel mapping family + return Concat(OggOpusConstants.OpusHeadSignature.ToArray(), tail); + } + + private static byte[] OpusTagsPacket() + { + var vendor = Encoding.ASCII.GetBytes("test"); + var packet = new List(); + packet.AddRange(OggOpusConstants.OpusTagsSignature.ToArray()); + packet.AddRange(BitConverter.GetBytes((uint)vendor.Length)); + packet.AddRange(vendor); + packet.AddRange(BitConverter.GetBytes(0u)); // user comment count + return packet.ToArray(); + } + + private static byte[] AudioPacket(int size) + { + var packet = new byte[size]; + for (var i = 0; i < size; i++) + packet[i] = (byte)(i & 0xFF); + return packet; + } + + private static byte[] Concat(params byte[][] parts) + { + var total = parts.Sum(p => p.Length); + var result = new byte[total]; + var cursor = 0; + foreach (var part in parts) + { + part.CopyTo(result, cursor); + cursor += part.Length; + } + return result; + } + + // ---- TrickleStream ----------------------------------------------------------------------- + + /// + /// A non-seekable, read-only stream wrapper that returns at most 1 byte per + /// call. This drives every + /// FillAsync invocation through its inner assembly loop, exercising the partial-read + /// path that never hits because it satisfies requests in one shot. + /// + private sealed class TrickleStream(byte[] data) : Stream + { + private int _position; + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (count == 0 || _position >= data.Length) return 0; + buffer[offset] = data[_position++]; + return 1; + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (buffer.IsEmpty || _position >= data.Length) return new ValueTask(0); + buffer.Span[0] = data[_position++]; + return new ValueTask(1); + } + + public override void Flush() { } + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } +}