Merge Phase 18.1 (Opus ingest transcode + seek-index sidecar) into streaming-overhaul

This commit is contained in:
daniel-c-harvey
2026-06-23 07:13:46 -04:00
17 changed files with 1317 additions and 0 deletions
+8
View File
@@ -4,6 +4,7 @@ using DeepDrftAPI;
using DeepDrftAPI.Middleware; using DeepDrftAPI.Middleware;
using DeepDrftAPI.Models; using DeepDrftAPI.Models;
using DeepDrftAPI.Services; using DeepDrftAPI.Services;
using DeepDrftAPI.Services.Opus;
using DeepDrftData; using DeepDrftData;
using DeepDrftData.Data; using DeepDrftData.Data;
using DeepDrftData.Repositories; using DeepDrftData.Repositories;
@@ -66,6 +67,13 @@ builder.Services
.AddScoped<ITrackService>(sp => sp.GetRequiredService<TrackManager>()); .AddScoped<ITrackService>(sp => sp.GetRequiredService<TrackManager>());
builder.Services.AddScoped<UnifiedTrackService>(); builder.Services.AddScoped<UnifiedTrackService>();
// Background Opus transcode (Phase 18.1, OQ6). One singleton is both the enqueue seam
// (IOpusTranscodeQueue, injected into the scoped UnifiedTrackService) and the hosted drain loop
// (IHostedService). It resolves OpusTranscodeService — a singleton — so no scope is captured.
builder.Services.AddSingleton<OpusTranscodeBackgroundService>();
builder.Services.AddSingleton<IOpusTranscodeQueue>(sp => sp.GetRequiredService<OpusTranscodeBackgroundService>());
builder.Services.AddHostedService(sp => sp.GetRequiredService<OpusTranscodeBackgroundService>());
// Phase 16 anonymous telemetry — append-only event logs + incremental play-counter rollup (all SQL). // Phase 16 anonymous telemetry — append-only event logs + incremental play-counter rollup (all SQL).
// EventManager is the IEventService boundary; EventRepository owns the EF writes and the // EventManager is the IEventService boundary; EventRepository owns the EF writes and the
// release-resolution + counter-bump transaction. // release-resolution + counter-bump transaction.
@@ -0,0 +1,18 @@
namespace DeepDrftAPI.Services.Opus;
/// <summary>
/// The enqueue seam for the background Opus transcode (OQ6 / §3.1a). <see cref="UnifiedTrackService"/>
/// depends only on this thin interface — not on the worker — so adding the background derive to the
/// upload/replace paths costs one small dependency, not the whole transcode graph. Enqueuing is
/// non-blocking and best-effort: a freshly uploaded track is already persisted and playable losslessly
/// before anything is enqueued, and the transcode runs off the request thread.
/// </summary>
public interface IOpusTranscodeQueue
{
/// <summary>
/// Schedules a background Opus derive for the track identified by <paramref name="entryKey"/>. Returns
/// immediately. A dropped or failed enqueue must not affect the caller — the track remains
/// lossless-only and eligible for backfill.
/// </summary>
void Enqueue(string entryKey);
}
@@ -0,0 +1,72 @@
using System.Threading.Channels;
using DeepDrftContent.Processors.Opus;
namespace DeepDrftAPI.Services.Opus;
/// <summary>
/// The background worker behind <see cref="IOpusTranscodeQueue"/> (OQ6 / §3.1a). An unbounded in-process
/// channel buffers EntryKeys enqueued by the upload and replace-audio paths; a single hosted loop drains
/// them one at a time and runs <see cref="OpusTranscodeService.TranscodeAndStoreAsync"/> for each. Serial
/// by design — a transcode is CPU-heavy (§3.1), so running them concurrently would starve request
/// handling; one-at-a-time keeps the derive strictly off the hot path without saturating the host.
///
/// This worker IS the queue (implements <see cref="IOpusTranscodeQueue"/>) so enqueue and drain share one
/// channel with no extra indirection. It is registered as a singleton and surfaced under both the
/// interface and <see cref="IHostedService"/>.
/// </summary>
public sealed class OpusTranscodeBackgroundService : BackgroundService, IOpusTranscodeQueue
{
private readonly Channel<string> _channel =
Channel.CreateUnbounded<string>(new UnboundedChannelOptions { SingleReader = true });
private readonly OpusTranscodeService _transcodeService;
private readonly ILogger<OpusTranscodeBackgroundService> _logger;
public OpusTranscodeBackgroundService(
OpusTranscodeService transcodeService,
ILogger<OpusTranscodeBackgroundService> logger)
{
_transcodeService = transcodeService;
_logger = logger;
}
public void Enqueue(string entryKey)
{
if (string.IsNullOrWhiteSpace(entryKey))
return;
if (!_channel.Writer.TryWrite(entryKey))
{
// Unbounded writer only rejects after Complete(), i.e. during shutdown. The track stays
// lossless-only and is eligible for backfill, so a dropped enqueue is non-fatal — log it.
_logger.LogWarning("Opus transcode: could not enqueue {EntryKey} (queue closed).", entryKey);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var entryKey in _channel.Reader.ReadAllAsync(stoppingToken))
{
try
{
await _transcodeService.TranscodeAndStoreAsync(entryKey, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break; // host shutting down
}
catch (Exception ex)
{
// TranscodeAndStoreAsync already swallows expected failures; this guards the loop against
// anything unexpected so one bad track never kills the worker.
_logger.LogError(ex, "Opus transcode: unhandled failure draining {EntryKey}; worker continues.", entryKey);
}
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_channel.Writer.TryComplete();
return base.StopAsync(cancellationToken);
}
}
@@ -1,3 +1,4 @@
using DeepDrftAPI.Services.Opus;
using DeepDrftContent; using DeepDrftContent;
using DeepDrftContent.Constants; using DeepDrftContent.Constants;
using DeepDrftContent.Processors; using DeepDrftContent.Processors;
@@ -39,6 +40,7 @@ public class UnifiedTrackService
private readonly ITrackService _sqlTrackService; private readonly ITrackService _sqlTrackService;
private readonly FileDb _fileDatabase; private readonly FileDb _fileDatabase;
private readonly WaveformProfileService _waveformProfileService; private readonly WaveformProfileService _waveformProfileService;
private readonly IOpusTranscodeQueue _opusTranscodeQueue;
private readonly ILogger<UnifiedTrackService> _logger; private readonly ILogger<UnifiedTrackService> _logger;
public UnifiedTrackService( public UnifiedTrackService(
@@ -46,12 +48,14 @@ public class UnifiedTrackService
ITrackService sqlTrackService, ITrackService sqlTrackService,
FileDb fileDatabase, FileDb fileDatabase,
WaveformProfileService waveformProfileService, WaveformProfileService waveformProfileService,
IOpusTranscodeQueue opusTranscodeQueue,
ILogger<UnifiedTrackService> logger) ILogger<UnifiedTrackService> logger)
{ {
_contentTrackContentService = contentTrackContentService; _contentTrackContentService = contentTrackContentService;
_sqlTrackService = sqlTrackService; _sqlTrackService = sqlTrackService;
_fileDatabase = fileDatabase; _fileDatabase = fileDatabase;
_waveformProfileService = waveformProfileService; _waveformProfileService = waveformProfileService;
_opusTranscodeQueue = opusTranscodeQueue;
_logger = logger; _logger = logger;
} }
@@ -219,6 +223,11 @@ public class UnifiedTrackService
// frontend, so a failure here is logged and swallowed — never fails the upload. // frontend, so a failure here is logged and swallowed — never fails the upload.
await TryStoreWaveformDatumsAsync(unpersisted.EntryKey, ct); await TryStoreWaveformDatumsAsync(unpersisted.EntryKey, ct);
// Schedule the low-data Opus derive (OQ6 / §3.1a): the track is persisted and lossless-playable
// NOW; the transcode + seek-index build run on a background worker. Non-blocking and best-effort
// — the upload response never waits on it, and a transcode failure leaves the track lossless-only.
_opusTranscodeQueue.Enqueue(unpersisted.EntryKey);
return saveResult; return saveResult;
} }
@@ -303,6 +312,11 @@ public class UnifiedTrackService
return Result.CreateFailResult("Audio replaced but duration metadata could not be updated."); return Result.CreateFailResult("Audio replaced but duration metadata could not be updated.");
} }
// The stale Opus artifact (if any) no longer matches the new source. Schedule a background
// regenerate — the transcode service overwrites the prior artifacts in place keyed by the same
// EntryKey. Best-effort, off the request thread, mirrors the waveform regen above.
_opusTranscodeQueue.Enqueue(entryKey);
return Result.CreatePassResult(); return Result.CreatePassResult();
} }
+22
View File
@@ -4,6 +4,7 @@ using DeepDrftContent.Constants;
using DeepDrftContent.FileDatabase.Models; using DeepDrftContent.FileDatabase.Models;
using DeepDrftContent.FileDatabase.Services; using DeepDrftContent.FileDatabase.Services;
using DeepDrftContent.Processors; using DeepDrftContent.Processors;
using DeepDrftContent.Processors.Opus;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using NetBlocks.Utilities.Environment; using NetBlocks.Utilities.Environment;
@@ -44,6 +45,7 @@ namespace DeepDrftAPI
InitializeTrackVault(db).GetAwaiter().GetResult(); InitializeTrackVault(db).GetAwaiter().GetResult();
InitializeImageVault(db).GetAwaiter().GetResult(); InitializeImageVault(db).GetAwaiter().GetResult();
InitializeTrackWaveformsVault(db).GetAwaiter().GetResult(); InitializeTrackWaveformsVault(db).GetAwaiter().GetResult();
InitializeTrackOpusVault(db).GetAwaiter().GetResult();
return db; return db;
}); });
@@ -65,6 +67,16 @@ namespace DeepDrftAPI
Environment.SetEnvironmentVariable("ASPNETCORE_TEMP", stagingPath); Environment.SetEnvironmentVariable("ASPNETCORE_TEMP", stagingPath);
builder.Services.AddSingleton(new UploadStagingDirectory(stagingPath)); builder.Services.AddSingleton(new UploadStagingDirectory(stagingPath));
// Opus low-data transcode (Phase 18.1). The domain service lives in DeepDrftContent; the host
// owns only the engine config and the background worker. Bitrate/ffmpeg-path come from the
// OpusTranscode config section; StagingPath is forced to the same data-disk staging directory
// the upload path uses so large transcode temp files never land on the /tmp tmpfs.
builder.Services.Configure<OpusTranscodeOptions>(
builder.Configuration.GetSection(nameof(OpusTranscodeOptions)));
builder.Services.PostConfigure<OpusTranscodeOptions>(o => o.StagingPath = stagingPath);
builder.Services.AddSingleton<FfmpegOpusEncoder>();
builder.Services.AddSingleton<OpusTranscodeService>();
return Task.CompletedTask; return Task.CompletedTask;
} }
@@ -107,5 +119,15 @@ namespace DeepDrftAPI
await fileDatabase.CreateVaultAsync(VaultConstants.TrackWaveforms, MediaVaultType.Media); await fileDatabase.CreateVaultAsync(VaultConstants.TrackWaveforms, MediaVaultType.Media);
} }
} }
// Ensure the track-opus vault exists (Phase 18.1). Holds the derived low-data Ogg Opus artifacts
// — the Opus audio bytes and the setup-header + seek-index sidecar — keyed by the track's EntryKey.
private static async Task InitializeTrackOpusVault(FileDatabase fileDatabase)
{
if (!fileDatabase.HasVault(VaultConstants.TrackOpus))
{
await fileDatabase.CreateVaultAsync(VaultConstants.TrackOpus, MediaVaultType.Audio);
}
}
} }
} }
@@ -28,4 +28,13 @@ public static class VaultConstants
/// The datum resolution is duration-derived (≈333 samples/sec, see <c>WaveformResolution</c>). /// The datum resolution is duration-derived (≈333 samples/sec, see <c>WaveformResolution</c>).
/// </summary> /// </summary>
public const string TrackWaveforms = "track-waveforms"; public const string TrackWaveforms = "track-waveforms";
/// <summary>
/// Vault name for the derived low-data Ogg Opus artifacts, keyed by the track's EntryKey (Phase 18,
/// S2). Holds two entries per track: the Opus audio bytes (<c>.opus</c>) and the combined setup-header
/// + granule→byte seek-index sidecar (<c>.opusidx</c>). Both are best-effort derived artifacts —
/// regenerable, and a track without them still plays losslessly. Distinct from the source <c>tracks</c>
/// vault so the source means exactly one thing (mirrors the <c>track-waveforms</c> precedent).
/// </summary>
public const string TrackOpus = "track-opus";
} }
@@ -206,6 +206,7 @@ public static class MimeTypeExtensions
{ ".flac", "audio/flac" }, { ".flac", "audio/flac" },
{ ".aac", "audio/aac" }, { ".aac", "audio/aac" },
{ ".ogg", "audio/ogg" }, { ".ogg", "audio/ogg" },
{ ".opus", "audio/ogg" },
{ ".m4a", "audio/mp4" } { ".m4a", "audio/mp4" }
}; };
@@ -0,0 +1,140 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// Encodes a source audio file (any format the source vault holds — WAV/MP3/FLAC) to Ogg Opus fullband
/// 320 kbps by shelling out to FFmpeg (libopus). FFmpeg is chosen over a managed encoder because it
/// muxes a correct Ogg container with accurate granule positions across every input format — the page
/// structure the seek-index walk depends on — which a raw libopus binding does not provide. The external
/// <c>ffmpeg</c> binary is therefore a host runtime prerequisite (flagged in the wave handoff).
/// </summary>
public sealed class FfmpegOpusEncoder
{
private readonly OpusTranscodeOptions _options;
private readonly ILogger<FfmpegOpusEncoder> _logger;
public FfmpegOpusEncoder(IOptions<OpusTranscodeOptions> options, ILogger<FfmpegOpusEncoder> logger)
{
_options = options.Value;
_logger = logger;
}
/// <summary>
/// Transcodes <paramref name="sourcePath"/> to an Ogg Opus file at <paramref name="destinationPath"/>.
/// Returns true on a clean exit with a non-empty output. Returns false (logged) on a non-zero exit,
/// a timeout, a missing ffmpeg binary, or any process failure — a transcode failure must never throw
/// to the caller (C6); the background worker treats false as "leave the track lossless-only".
/// </summary>
public async Task<bool> EncodeAsync(string sourcePath, string destinationPath, CancellationToken ct)
{
var ffmpeg = string.IsNullOrWhiteSpace(_options.FfmpegPath) ? "ffmpeg" : _options.FfmpegPath;
// -vn drops any cover-art video stream; -map a:0 takes the first audio stream; -ar 48000 forces
// fullband (Opus internally resamples to 48 kHz anyway, but stating it keeps granulepos math
// unambiguous); libopus VBR at the target bitrate; -f ogg for an explicit Ogg container; -y
// overwrites the (pre-created, empty) destination temp file.
var args = new[]
{
"-hide_banner", "-nostdin", "-loglevel", "error",
"-i", sourcePath,
"-vn", "-map", "a:0",
"-c:a", "libopus", "-b:a", $"{_options.BitrateKbps}k",
"-ar", "48000",
"-f", "ogg",
"-y", destinationPath,
};
var psi = new ProcessStartInfo(ffmpeg)
{
RedirectStandardError = true,
RedirectStandardOutput = true,
UseShellExecute = false,
CreateNoWindow = true,
};
foreach (var arg in args)
psi.ArgumentList.Add(arg);
using var process = new Process { StartInfo = psi };
try
{
if (!process.Start())
{
_logger.LogError("Opus transcode: ffmpeg failed to start ({Ffmpeg}).", ffmpeg);
return false;
}
}
catch (Exception ex)
{
// Most commonly a missing binary (Win32Exception "file not found"). This is the ops
// prerequisite failing — log loudly so it is unmistakable in the deploy logs.
_logger.LogError(ex,
"Opus transcode: could not launch ffmpeg ({Ffmpeg}). Is the ffmpeg binary installed on the host?",
ffmpeg);
return false;
}
using var timeout = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeout.CancelAfter(TimeSpan.FromSeconds(_options.TimeoutSeconds));
// Drain stderr concurrently — ffmpeg can block writing diagnostics if the pipe is not read.
var stderrTask = process.StandardError.ReadToEndAsync(timeout.Token);
try
{
await process.WaitForExitAsync(timeout.Token);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
TryKill(process);
await SafeStderr(stderrTask); // observe to avoid unobserved-task warnings
throw; // genuine shutdown cancellation — let it propagate
}
catch (OperationCanceledException)
{
TryKill(process);
await SafeStderr(stderrTask); // observe to avoid unobserved-task warnings
_logger.LogError("Opus transcode: ffmpeg exceeded the {Timeout}s timeout for {Source}.",
_options.TimeoutSeconds, sourcePath);
return false;
}
var stderr = await SafeStderr(stderrTask);
if (process.ExitCode != 0)
{
_logger.LogError("Opus transcode: ffmpeg exited {Code} for {Source}. stderr: {Stderr}",
process.ExitCode, sourcePath, stderr);
return false;
}
if (!File.Exists(destinationPath) || new FileInfo(destinationPath).Length == 0)
{
_logger.LogError("Opus transcode: ffmpeg exited 0 but produced no output for {Source}.", sourcePath);
return false;
}
return true;
}
private void TryKill(Process process)
{
try
{
if (!process.HasExited)
process.Kill(entireProcessTree: true);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Opus transcode: failed to kill timed-out ffmpeg process.");
}
}
private static async Task<string> SafeStderr(Task<string> stderrTask)
{
try { return await stderrTask; }
catch { return "<stderr unavailable>"; }
}
}
@@ -0,0 +1,65 @@
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// Wire-format constants for the Ogg-Opus derived artifacts. Centralised so the seek-index codec,
/// the page walker, and the tests agree on one set of magic numbers.
/// </summary>
public static class OggOpusConstants
{
/// <summary>Opus granule positions are always sample counts at 48 kHz, regardless of input rate.</summary>
public const double OpusSampleRate = 48000.0;
/// <summary>One seek-index entry per this many seconds of audio (OQ7 — 0.5 s buckets).</summary>
public const double SeekBucketSeconds = 0.5;
/// <summary>The Ogg page capture pattern "OggS" — every page starts with these four bytes.</summary>
public static ReadOnlySpan<byte> CapturePattern => "OggS"u8;
/// <summary>Magic signature opening an OpusHead identification header packet.</summary>
public static ReadOnlySpan<byte> OpusHeadSignature => "OpusHead"u8;
/// <summary>Magic signature opening an OpusTags comment header packet.</summary>
public static ReadOnlySpan<byte> OpusTagsSignature => "OpusTags"u8;
/// <summary>
/// Fixed size of an Ogg page header before the segment table: capture(4) + version(1) +
/// header-type(1) + granulepos(8) + serial(4) + sequence(4) + checksum(4) + page-segments(1).
/// </summary>
public const int OggPageHeaderSize = 27;
/// <summary>Byte offset of the 64-bit granule position within an Ogg page header.</summary>
public const int GranulePositionOffset = 6;
/// <summary>Byte offset of the page-segment count (the segment-table length) within the header.</summary>
public const int PageSegmentCountOffset = 26;
/// <summary>Sentinel granule position for a page that ends mid-packet (no usable timestamp).</summary>
public const ulong NoGranulePosition = 0xFFFFFFFFFFFFFFFFUL;
/// <summary>
/// Minimum byte length of an <c>OpusHead</c> packet payload to safely read <c>pre_skip</c>.
/// RFC 7845 §5.1: "OpusHead"(8) + version(1) + channels(1) + pre_skip(2) = 12 bytes minimum.
/// </summary>
public const int OpusHeadMinSize = 12;
/// <summary>
/// Byte offset of <c>pre_skip</c> within the full <c>OpusHead</c> packet payload (including the
/// magic). RFC 7845 §5.1: "OpusHead"(8) + version(1) + channels(1) = 10 bytes before pre_skip.
/// </summary>
public const int OpusHeadPreSkipOffset = 10;
/// <summary>
/// Header size of the serialized seek-index blob:
/// totalBytes(8) + duration(8) + count(4) + preSkip(2) + reserved(2) = 24 bytes.
/// </summary>
public const int SeekIndexHeaderSize = 24;
/// <summary>Size of one serialized seek point: granulepos(8) + byteOffset(8).</summary>
public const int SeekPointSize = 16;
/// <summary>Vault-resource extension for the Opus audio bytes.</summary>
public const string OpusExtension = ".opus";
/// <summary>Vault-resource extension for the combined setup-header + seek-index sidecar.</summary>
public const string SidecarExtension = ".opusidx";
}
@@ -0,0 +1,146 @@
using System.Buffers.Binary;
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// The result of walking an encoded Ogg Opus stream once: the captured setup header (the leading
/// <c>OpusHead</c> + <c>OpusTags</c> pages, verbatim) and the bucketed granule→byte seek index. This
/// is everything the sidecar artifact carries (§3.4a) — built at transcode time so delivery never
/// re-walks the stream.
/// </summary>
/// <param name="SetupHeaderBytes">The leading setup pages (OpusHead + OpusTags), exactly as they
/// appear at the start of the stream, ready to prepend to any mid-stream page run before decode.</param>
/// <param name="SeekIndex">The accurate, 0.5 s-bucketed granule→byte transfer function.</param>
public sealed record OggOpusWalk(byte[] SetupHeaderBytes, OggOpusSeekIndex SeekIndex);
/// <summary>
/// Pure Ogg-Opus stream walker. Reads the page structure directly (the <c>OggS</c> capture pattern and
/// the 27-byte page header) to (1) capture the setup-header pages and (2) record, for every audio page,
/// its end granule position and exact byte offset — bucketed to 0.5 s with each bucket boundary snapped
/// to the nearest enclosing page start. No external dependency: the encoder (FFmpeg) produces the bytes;
/// this turns them into the seek artifact deterministically, so it is unit-testable without a codec.
/// </summary>
public static class OggOpusParser
{
/// <summary>
/// Walks <paramref name="oggBytes"/> and produces the setup header + seek index, or null if the
/// bytes are not a recognisable Ogg Opus stream (no setup header, no audio pages, or truncated
/// structure). A null is the caller's signal to treat the transcode as failed and leave the track
/// lossless-only (C6) — it does not throw for malformed input.
/// </summary>
public static OggOpusWalk? Walk(ReadOnlySpan<byte> oggBytes)
{
var setupHeaderEnd = -1;
var sawOpusHead = false;
var sawOpusTags = false;
ushort preSkip = 0;
var points = new List<OpusSeekPoint>();
ulong lastGranule = 0;
var nextBucketTime = 0.0;
var firstAudioPointTaken = false;
var offset = 0;
while (offset + OggOpusConstants.OggPageHeaderSize <= oggBytes.Length)
{
var page = oggBytes.Slice(offset);
if (!page[..4].SequenceEqual(OggOpusConstants.CapturePattern))
{
// Not on a page boundary — the encoder writes contiguous pages, so this means the
// stream is malformed or we mis-stepped. Either way it is unrecoverable here.
return null;
}
var segmentCount = page[OggOpusConstants.PageSegmentCountOffset];
var segmentTableEnd = OggOpusConstants.OggPageHeaderSize + segmentCount;
if (segmentTableEnd > page.Length)
return null; // truncated header
var payloadSize = 0;
for (var i = 0; i < segmentCount; i++)
payloadSize += page[OggOpusConstants.OggPageHeaderSize + i];
var pageTotalSize = segmentTableEnd + payloadSize;
if (pageTotalSize > page.Length)
return null; // truncated payload
var payload = page.Slice(segmentTableEnd, payloadSize);
var granule = BinaryPrimitives.ReadUInt64LittleEndian(
page.Slice(OggOpusConstants.GranulePositionOffset, 8));
// The setup pages carry no audio granule (OpusHead has granulepos 0; OpusTags too). They
// are the leading pages whose payload opens with the Opus magic signatures.
if (!sawOpusHead && StartsWith(payload, OggOpusConstants.OpusHeadSignature))
{
sawOpusHead = true;
setupHeaderEnd = offset + pageTotalSize;
// RFC 7845 §5.1 — OpusHead layout after the 8-byte "OpusHead" magic:
// [0] version (1 byte), [1] channel count (1 byte),
// [2-3] pre_skip (little-endian uint16) ← at packet bytes 10-11
// pre_skip is the number of decoder samples to discard before presenting audio;
// all granule→time conversions must subtract it (RFC 7845 §4.3).
if (payload.Length >= OggOpusConstants.OpusHeadMinSize)
preSkip = BinaryPrimitives.ReadUInt16LittleEndian(
payload.Slice(OggOpusConstants.OpusHeadPreSkipOffset, 2));
}
else if (sawOpusHead && !sawOpusTags && StartsWith(payload, OggOpusConstants.OpusTagsSignature))
{
sawOpusTags = true;
setupHeaderEnd = offset + pageTotalSize;
}
else if (sawOpusHead && sawOpusTags)
{
// Audio page. Record the first audio page unconditionally (the seek anchor at t=0),
// then one entry per 0.5 s bucket. A page with no end-granule (mid-packet continuation,
// granulepos == -1) is skipped for indexing — its time is unknown — but still advances
// the byte cursor.
if (granule != OggOpusConstants.NoGranulePosition)
{
// RFC 7845 §4.3: presentation time = max(0, granule preSkip) / 48000.
// Use this corrected time for bucketing so that a stream with pre-skip 3840 (~80 ms)
// does not systematically offset every indexed time by that amount.
var correctedTime = Math.Max(0.0,
(granule - (double)preSkip) / OggOpusConstants.OpusSampleRate);
if (!firstAudioPointTaken)
{
// Anchor the first seek point at corrected time = 0 by storing the granule as
// preSkip. This guarantees that a binary search for t=0 ("largest entry with
// corrected time ≤ 0") always resolves to the first audio page's byte offset —
// even when the real granule is slightly above preSkip due to encoder lead-in.
points.Add(new OpusSeekPoint(preSkip, (ulong)offset));
firstAudioPointTaken = true;
nextBucketTime = OggOpusConstants.SeekBucketSeconds;
}
else if (correctedTime >= nextBucketTime)
{
points.Add(new OpusSeekPoint(granule, (ulong)offset));
// Advance past every bucket this page crossed so a long page does not emit a
// backlog of entries; the next bucket is the first boundary strictly after it.
while (nextBucketTime <= correctedTime)
nextBucketTime += OggOpusConstants.SeekBucketSeconds;
}
lastGranule = granule;
}
}
offset += pageTotalSize;
}
if (!sawOpusHead || setupHeaderEnd < 0 || points.Count == 0)
return null;
var setupHeader = oggBytes[..setupHeaderEnd].ToArray();
// RFC 7845 §4.3: total duration is also pre-skip-corrected, matching the time a listener
// experiences (the last audio page's corrected time, clamped to ≥ 0).
var totalDuration = Math.Max(0.0,
(lastGranule - (double)preSkip) / OggOpusConstants.OpusSampleRate);
var index = new OggOpusSeekIndex(points, totalDuration, (ulong)oggBytes.Length, preSkip);
return new OggOpusWalk(setupHeader, index);
}
private static bool StartsWith(ReadOnlySpan<byte> payload, ReadOnlySpan<byte> signature) =>
payload.Length >= signature.Length && payload[..signature.Length].SequenceEqual(signature);
}
@@ -0,0 +1,124 @@
using System.Buffers.Binary;
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// A single seek-index entry: an authoritative 48 kHz <see cref="GranulePosition"/> (Opus granule
/// positions are always sample counts at 48 kHz) paired with the exact byte offset of the Ogg page that
/// carries it. Every <see cref="ByteOffset"/> is a real page-start boundary, so a
/// <c>Range: bytes={ByteOffset}-</c> fetch lands the decoder Ogg-sync-aligned.
/// </summary>
/// <remarks>
/// Per RFC 7845 §4.3, the PCM presentation time is <c>(granulepos preSkip) / 48000</c>. The raw
/// <see cref="GranulePosition"/> is stored here as-is; callers should subtract the containing
/// <see cref="OggOpusSeekIndex.PreSkip"/> before converting to a presentation time. Use
/// <see cref="OggOpusSeekIndex.PresentationTimeSeconds"/> for the corrected value.
/// </remarks>
/// <param name="GranulePosition">The page's end granule position (48 kHz sample count).</param>
/// <param name="ByteOffset">The byte offset of the page start in the Opus file.</param>
public readonly record struct OpusSeekPoint(ulong GranulePosition, ulong ByteOffset)
{
/// <summary>
/// Raw granule-position-to-time conversion (granulepos / 48 kHz). Does NOT subtract pre-skip — use
/// <see cref="OggOpusSeekIndex.PresentationTimeSeconds"/> for the RFC 7845-correct presentation time.
/// </summary>
public double RawTimeSeconds => GranulePosition / OggOpusConstants.OpusSampleRate;
}
/// <summary>
/// The accurate, precomputed transfer function from seek-time to true file byte offset for one Ogg
/// Opus stream (§3.4a A). Built once at transcode time by walking the encoded stream; the client reads
/// it back and binary-searches <see cref="Points"/> instead of doing inaccurate VBR byte-rate math.
/// One entry per 0.5 s of audio (<see cref="OggOpusConstants.SeekBucketSeconds"/>), each snapped to the
/// nearest enclosing page start, plus the totals needed to clamp a seek to range.
/// </summary>
/// <param name="Points">Ordered (granulepos, byteOffset) entries, ascending. The first entry always
/// has <see cref="OpusSeekPoint.GranulePosition"/> == <paramref name="PreSkip"/> (corrected time = 0)
/// and points at the first audio page start, ensuring a seek to t=0 always resolves.</param>
/// <param name="TotalDurationSeconds">
/// Pre-skip-corrected total stream duration: <c>max(0, lastGranule preSkip) / 48000</c>.
/// </param>
/// <param name="TotalByteLength">Total Opus file byte length, for clamping a seek past the end.</param>
/// <param name="PreSkip">
/// The <c>pre_skip</c> value from the <c>OpusHead</c> identification header (RFC 7845 §5.1). Opus
/// decoders must discard this many samples from the decoded start before presenting audio. The client
/// (wave 18.4) needs this to trim the first decoded buffer; storing it here avoids a re-parse of the
/// Ogg stream at delivery time.
/// </param>
public sealed record OggOpusSeekIndex(
IReadOnlyList<OpusSeekPoint> Points,
double TotalDurationSeconds,
ulong TotalByteLength,
ushort PreSkip)
{
/// <summary>
/// Returns the RFC 7845-correct presentation time for a seek point: <c>max(0, granule preSkip) / 48000</c>.
/// Use this for all time comparisons; raw <see cref="OpusSeekPoint.RawTimeSeconds"/> omits the pre-skip.
/// </summary>
public double PresentationTimeSeconds(OpusSeekPoint point) =>
Math.Max(0.0, (point.GranulePosition - (double)PreSkip) / OggOpusConstants.OpusSampleRate);
/// <summary>
/// Serializes the index to the compact little-endian binary blob the sidecar stores. Layout:
/// <c>[uint64 totalByteLength][double totalDurationSeconds][uint32 pointCount][uint16 preSkip][uint16 reserved]</c>
/// then <c>pointCount × (uint64 granulepos, uint64 byteOffset)</c>. The four-byte preSkip+reserved
/// region pads the header to 24 bytes, keeping the point table 8-byte-aligned.
/// Fixed-width records keep the client parse to a single typed-array read.
/// </summary>
public byte[] ToBytes()
{
var size = OggOpusConstants.SeekIndexHeaderSize + Points.Count * OggOpusConstants.SeekPointSize;
var bytes = new byte[size];
var span = bytes.AsSpan();
BinaryPrimitives.WriteUInt64LittleEndian(span[..8], TotalByteLength);
BinaryPrimitives.WriteDoubleLittleEndian(span.Slice(8, 8), TotalDurationSeconds);
BinaryPrimitives.WriteUInt32LittleEndian(span.Slice(16, 4), (uint)Points.Count);
BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(20, 2), PreSkip);
// bytes 22-23: reserved (zero-initialized by array allocation)
var cursor = OggOpusConstants.SeekIndexHeaderSize;
foreach (var point in Points)
{
BinaryPrimitives.WriteUInt64LittleEndian(span.Slice(cursor, 8), point.GranulePosition);
BinaryPrimitives.WriteUInt64LittleEndian(span.Slice(cursor + 8, 8), point.ByteOffset);
cursor += OggOpusConstants.SeekPointSize;
}
return bytes;
}
/// <summary>
/// Parses a blob produced by <see cref="ToBytes"/>. Returns null if the blob is too short or its
/// declared point count does not fit — the storage contract is exact, so a malformed blob is a
/// corruption signal, not a recoverable shape. (Provided so tests and any future server-side reader
/// share one codec with the writer.)
/// </summary>
public static OggOpusSeekIndex? FromBytes(ReadOnlySpan<byte> bytes)
{
if (bytes.Length < OggOpusConstants.SeekIndexHeaderSize)
return null;
var totalByteLength = BinaryPrimitives.ReadUInt64LittleEndian(bytes[..8]);
var totalDuration = BinaryPrimitives.ReadDoubleLittleEndian(bytes.Slice(8, 8));
var count = BinaryPrimitives.ReadUInt32LittleEndian(bytes.Slice(16, 4));
var preSkip = BinaryPrimitives.ReadUInt16LittleEndian(bytes.Slice(20, 2));
// bytes 22-23: reserved — ignored on read for forward-compatibility
var expected = OggOpusConstants.SeekIndexHeaderSize + (long)count * OggOpusConstants.SeekPointSize;
if (bytes.Length < expected)
return null;
var points = new OpusSeekPoint[count];
var cursor = OggOpusConstants.SeekIndexHeaderSize;
for (var i = 0; i < count; i++)
{
var granule = BinaryPrimitives.ReadUInt64LittleEndian(bytes.Slice(cursor, 8));
var offset = BinaryPrimitives.ReadUInt64LittleEndian(bytes.Slice(cursor + 8, 8));
points[i] = new OpusSeekPoint(granule, offset);
cursor += OggOpusConstants.SeekPointSize;
}
return new OggOpusSeekIndex(points, totalDuration, totalByteLength, preSkip);
}
}
@@ -0,0 +1,57 @@
using System.Buffers.Binary;
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// The single derived sidecar artifact per track (§3.4a B, recommended design): the Opus setup header
/// (<c>OpusHead</c> + <c>OpusTags</c>) followed by the granule→byte seek index. The client fetches this
/// once on track load and parses it into its <c>OpusSeekData</c>, so it always has both the setup bytes
/// (to prepend to any mid-stream slice) and the accurate seek transfer function before it ever issues a
/// Range fetch — including a window that opens away from byte 0 (UC9).
/// </summary>
/// <param name="SetupHeaderBytes">The verbatim OpusHead + OpusTags pages.</param>
/// <param name="SeekIndex">The bucketed granule→byte seek index.</param>
public sealed record OpusSidecar(byte[] SetupHeaderBytes, OggOpusSeekIndex SeekIndex)
{
/// <summary>
/// Serializes to <c>[uint32 setupHeaderLength][setup-header bytes][seek-index blob]</c>. The
/// length prefix lets the client split the two regions with one read; the seek-index blob carries
/// its own self-describing header (<see cref="OggOpusSeekIndex.ToBytes"/>), so it needs no trailing
/// length.
/// </summary>
public byte[] ToBytes()
{
var indexBytes = SeekIndex.ToBytes();
var bytes = new byte[4 + SetupHeaderBytes.Length + indexBytes.Length];
var span = bytes.AsSpan();
BinaryPrimitives.WriteUInt32LittleEndian(span[..4], (uint)SetupHeaderBytes.Length);
SetupHeaderBytes.CopyTo(span.Slice(4));
indexBytes.CopyTo(span.Slice(4 + SetupHeaderBytes.Length));
return bytes;
}
/// <summary>
/// Parses a blob produced by <see cref="ToBytes"/>. Returns null on any structural inconsistency
/// (short blob, length prefix that overruns, or an unparseable index) — the format is exact, so a
/// malformed blob is corruption.
/// </summary>
public static OpusSidecar? FromBytes(ReadOnlySpan<byte> bytes)
{
if (bytes.Length < 4)
return null;
var setupLength = BinaryPrimitives.ReadUInt32LittleEndian(bytes[..4]);
var indexStart = 4 + (long)setupLength;
if (bytes.Length < indexStart)
return null;
var setupHeader = bytes.Slice(4, (int)setupLength).ToArray();
var index = OggOpusSeekIndex.FromBytes(bytes.Slice((int)indexStart));
if (index is null)
return null;
return new OpusSidecar(setupHeader, index);
}
}
@@ -0,0 +1,33 @@
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// Host-supplied configuration for the Opus transcode. The only operationally significant knob is
/// <see cref="FfmpegPath"/> — the transcode shells out to FFmpeg (libopus), which must be present on the
/// DeepDrftAPI host (see the wave handoff notes). Defaults target Ogg Opus fullband (48 kHz) at 320 kbps,
/// the artifact the spec fixes (§1).
/// </summary>
public sealed class OpusTranscodeOptions
{
/// <summary>
/// Path to the ffmpeg executable. Empty/null resolves to <c>"ffmpeg"</c> (found on PATH). Override
/// with an absolute path when the binary is not on the host PATH.
/// </summary>
public string FfmpegPath { get; set; } = "ffmpeg";
/// <summary>Target Opus bitrate in kbps. 320 kbps fullband is the fixed artifact quality (§1).</summary>
public int BitrateKbps { get; set; } = 320;
/// <summary>
/// Directory for the transient source/output files the transcode stages. Defaults to the system
/// temp path; the host overrides it to the data-disk upload-staging directory so large files never
/// land on the small RAM-backed <c>/tmp</c> tmpfs (same constraint the upload path already honours).
/// </summary>
public string StagingPath { get; set; } = Path.GetTempPath();
/// <summary>
/// Hard ceiling on a single transcode, in seconds. A run that exceeds it is killed and the track
/// stays lossless-only (C6). Generous by default — a 1 GB mix is CPU-expensive (§3.1) — but bounded
/// so a hung ffmpeg never wedges the background worker.
/// </summary>
public int TimeoutSeconds { get; set; } = 3600;
}
@@ -0,0 +1,154 @@
using DeepDrftContent.Constants;
using DeepDrftContent.FileDatabase.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using FileDb = DeepDrftContent.FileDatabase.Services.FileDatabase;
namespace DeepDrftContent.Processors.Opus;
/// <summary>
/// Derives and persists a track's low-data Ogg Opus artifacts (Phase 18.1). Mirrors
/// <see cref="WaveformProfileService"/>'s derived-artifact lifecycle: compute from the stored source,
/// store in a dedicated vault keyed by <c>EntryKey</c>, regenerable, failure-tolerant. For one track it
/// produces two entries in the <see cref="VaultConstants.TrackOpus"/> vault — the Opus audio bytes and a
/// combined setup-header + seek-index sidecar (§3.4a). Strictly additive: the source <c>tracks</c> vault
/// is never touched, and a failure here leaves the track lossless-only and eligible for backfill (C2/C6).
/// </summary>
public sealed class OpusTranscodeService
{
private readonly FileDb _fileDatabase;
private readonly FfmpegOpusEncoder _encoder;
private readonly OpusTranscodeOptions _options;
private readonly ILogger<OpusTranscodeService> _logger;
public OpusTranscodeService(
FileDb fileDatabase,
FfmpegOpusEncoder encoder,
IOptions<OpusTranscodeOptions> options,
ILogger<OpusTranscodeService> logger)
{
_fileDatabase = fileDatabase;
_encoder = encoder;
_options = options.Value;
_logger = logger;
}
/// <summary>
/// Reads the source audio for <paramref name="entryKey"/> from the <c>tracks</c> vault, transcodes it
/// to Ogg Opus 320, walks the encoded stream to build the seek index + capture the setup header, and
/// stores the Opus bytes and the sidecar in the <see cref="VaultConstants.TrackOpus"/> vault under the
/// same key. Re-runnable — a second call overwrites the prior artifacts (backfill / replace-audio).
/// Returns false (logged) on any failure; never throws for expected failure modes (C6). The only
/// propagated exception is <see cref="OperationCanceledException"/> on genuine shutdown.
/// </summary>
public async Task<bool> TranscodeAndStoreAsync(string entryKey, CancellationToken ct)
{
var source = await _fileDatabase.LoadResourceAsync<AudioBinary>(VaultConstants.Tracks, entryKey);
if (source is null)
{
_logger.LogWarning("Opus transcode: no source audio in vault for {EntryKey}; skipping.", entryKey);
return false;
}
Directory.CreateDirectory(_options.StagingPath);
var sourcePath = Path.Combine(_options.StagingPath, $"opus-src-{Guid.NewGuid():N}{source.Extension}");
var opusPath = Path.Combine(_options.StagingPath, $"opus-out-{Guid.NewGuid():N}{OggOpusConstants.OpusExtension}");
try
{
await File.WriteAllBytesAsync(sourcePath, source.Buffer, ct);
if (!await _encoder.EncodeAsync(sourcePath, opusPath, ct))
return false; // encoder already logged the cause
var opusBytes = await File.ReadAllBytesAsync(opusPath, ct);
var walk = OggOpusParser.Walk(opusBytes);
if (walk is null)
{
_logger.LogError(
"Opus transcode: ffmpeg produced output but the Ogg stream could not be walked for {EntryKey}; " +
"no artifacts stored.", entryKey);
return false;
}
await EnsureVaultAsync();
var opusBitrate = source.Duration > 0
? (int)(opusBytes.Length * 8 / source.Duration / 1000)
: _options.BitrateKbps;
var audioBinary = new AudioBinary(new AudioBinaryParams(
opusBytes, opusBytes.Length, OggOpusConstants.OpusExtension, source.Duration, opusBitrate));
var sidecar = new OpusSidecar(walk.SetupHeaderBytes, walk.SeekIndex).ToBytes();
var sidecarBinary = new MediaBinary(new MediaBinaryParams(
sidecar, sidecar.Length, OggOpusConstants.SidecarExtension));
// Store the audio first, then the sidecar. If the sidecar write fails the Opus bytes are
// present but unseekable — treat that as a failed derive (return false) so a backfill re-runs
// it; do not leave a half-derived track that the delivery layer would treat as complete.
var audioStored = await _fileDatabase.RegisterResourceAsync(
VaultConstants.TrackOpus, OpusAudioKey(entryKey), audioBinary);
if (!audioStored)
{
_logger.LogError("Opus transcode: vault write of Opus audio failed for {EntryKey}.", entryKey);
return false;
}
var sidecarStored = await _fileDatabase.RegisterResourceAsync(
VaultConstants.TrackOpus, OpusSidecarKey(entryKey), sidecarBinary);
if (!sidecarStored)
{
_logger.LogError("Opus transcode: vault write of sidecar failed for {EntryKey}.", entryKey);
return false;
}
_logger.LogInformation(
"Opus transcode complete for {EntryKey}: {OpusBytes} bytes, {Points} seek points, {Duration:F1}s.",
entryKey, opusBytes.Length, walk.SeekIndex.Points.Count, walk.SeekIndex.TotalDurationSeconds);
return true;
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Opus transcode failed for {EntryKey}; track stays lossless-only.", entryKey);
return false;
}
finally
{
TryDelete(sourcePath);
TryDelete(opusPath);
}
}
/// <summary>The vault entry key under which a track's Opus audio bytes are stored.</summary>
public static string OpusAudioKey(string entryKey) => entryKey;
/// <summary>The vault entry key under which a track's setup-header + seek-index sidecar is stored.</summary>
public static string OpusSidecarKey(string entryKey) => $"{entryKey}-sidecar";
private async Task EnsureVaultAsync()
{
// The TrackOpus vault is created at host startup (Startup.cs), so this guard is normally a
// no-op for the upload path. It is retained for the backfill path, which may run via a
// standalone CLI or a host that skips vault pre-creation, where the vault might not exist.
if (!_fileDatabase.HasVault(VaultConstants.TrackOpus))
await _fileDatabase.CreateVaultAsync(VaultConstants.TrackOpus, MediaVaultType.Audio);
}
private void TryDelete(string path)
{
try
{
if (File.Exists(path))
File.Delete(path);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Opus transcode: failed to delete staging file {Path}.", path);
}
}
}
+16
View File
@@ -0,0 +1,16 @@
using DeepDrftAPI.Services.Opus;
namespace DeepDrftTests;
/// <summary>
/// Test double for <see cref="IOpusTranscodeQueue"/>. The background Opus derive is out of scope for the
/// upload/dual-database tests — those assert SQL + source-vault behaviour, which is unchanged by Phase 18
/// (Opus is strictly additive). Enqueuing is a no-op so the upload path under test never spins up a real
/// transcode. Records the enqueued keys in case a test wants to assert the scheduling contract.
/// </summary>
public sealed class NoOpOpusTranscodeQueue : IOpusTranscodeQueue
{
public List<string> Enqueued { get; } = [];
public void Enqueue(string entryKey) => Enqueued.Add(entryKey);
}
+437
View File
@@ -0,0 +1,437 @@
using System.Buffers.Binary;
using System.Text;
using DeepDrftContent.Processors.Opus;
namespace DeepDrftTests;
/// <summary>
/// Coverage for the Phase 18.1 seek-index + setup-header extraction (§3.4a). These exercise the pure
/// Ogg-Opus walker and the sidecar codec over hand-built Ogg streams — no ffmpeg dependency — so the
/// granule→byte mapping, pre-skip correction (RFC 7845 §4.3), page-boundary snapping, 0.5 s bucketing,
/// t=0 anchor, clamp totals, and setup-header capture are asserted deterministically. The byte layout
/// mirrors a real Opus stream: an OpusHead page, an OpusTags page, then audio pages each carrying an
/// end granule position at 48 kHz.
/// </summary>
[TestFixture]
public class OggOpusParserTests
{
// libopus default pre-skip: 312 samples at 48 kHz (≈ 6.5 ms). FFmpeg may use 3840 (~80 ms).
// Using 312 here as a realistic non-zero value that is small enough not to affect the test
// granules (which start at 48000), while still exercising the pre-skip subtraction path.
private const ushort TestPreSkip = 312;
[Test]
public void Walk_CapturesSetupHeader_AsLeadingOpusHeadAndOpusTagsPagesVerbatim()
{
var head = OggPage(granule: 0, OpusHeadPacket());
var tags = OggPage(granule: 0, OpusTagsPacket());
var audio = OggPage(granule: 48000, AudioPacket(64));
var stream = Concat(head, tags, audio);
var walk = OggOpusParser.Walk(stream);
Assert.That(walk, Is.Not.Null, "A well-formed Ogg Opus stream must walk");
var expectedSetup = Concat(head, tags);
Assert.That(walk!.SetupHeaderBytes, Is.EqualTo(expectedSetup),
"Setup header must be the OpusHead + OpusTags pages, byte-for-byte, and stop before the first audio page");
}
[Test]
public void Walk_FirstSeekPoint_IsAnchoredAtTimeZero_PointingAtFirstAudioPage()
{
// Use a non-zero pre-skip to verify that the first seek point is explicitly anchored at
// corrected time = 0, not at the raw granule time.
var head = OggPage(granule: 0, OpusHeadPacket(preSkip: TestPreSkip));
var tags = OggPage(granule: 0, OpusTagsPacket());
var audio = OggPage(granule: 48000, AudioPacket(64));
var stream = Concat(head, tags, audio);
var firstAudioOffset = (ulong)(head.Length + tags.Length);
var walk = OggOpusParser.Walk(stream);
Assert.That(walk, Is.Not.Null);
var first = walk!.SeekIndex.Points[0];
// The first point's byte offset must be the first audio page start (exact page boundary).
Assert.That(first.ByteOffset, Is.EqualTo(firstAudioOffset),
"The first seek point must land on the first audio page's start offset (an exact page boundary)");
// The first point's stored granule is clamped to preSkip so corrected presentation time = 0.
// This guarantees a binary search for t=0 always resolves to the first audio page.
Assert.That(walk.SeekIndex.PreSkip, Is.EqualTo(TestPreSkip),
"PreSkip must be parsed from OpusHead and carried into the seek index");
Assert.That(walk.SeekIndex.PresentationTimeSeconds(first), Is.EqualTo(0.0).Within(1e-12),
"First seek point must have corrected presentation time = 0 so a seek to t=0 always resolves");
}
[Test]
public void Walk_PreSkip_IsSubtractedFromGranuleInTimeCalculations()
{
// Pre-skip of 3840 samples (≈ 80 ms, the libopus typical value used by ffmpeg).
// Without the fix, pageTime = 48000 / 48000 = 1.0 s; with fix, (48000 - 3840) / 48000 = 0.92 s.
const ushort preSkip = 3840;
var head = OggPage(granule: 0, OpusHeadPacket(preSkip: preSkip));
var tags = OggPage(granule: 0, OpusTagsPacket());
// First audio page at granule 48000 (1.0 s raw; 0.92 s corrected)
var a1 = OggPage(granule: 48000, AudioPacket(64));
// Second audio page at granule 96000 (2.0 s raw; 1.92 s corrected)
var a2 = OggPage(granule: 96000, AudioPacket(64));
// Third audio page at granule 144000 (3.0 s raw; 2.92 s corrected)
var a3 = OggPage(granule: 144000, AudioPacket(64));
var walk = OggOpusParser.Walk(Concat(head, tags, a1, a2, a3));
Assert.That(walk, Is.Not.Null);
var index = walk!.SeekIndex;
Assert.That(index.PreSkip, Is.EqualTo(preSkip), "PreSkip must be parsed from OpusHead");
// TotalDurationSeconds must be pre-skip-corrected: (144000 - 3840) / 48000 = 2.92 s
var expectedDuration = (144000.0 - preSkip) / 48000.0;
Assert.That(index.TotalDurationSeconds, Is.EqualTo(expectedDuration).Within(1e-9),
"TotalDurationSeconds must subtract preSkip (RFC 7845 §4.3), not use raw lastGranule / 48000");
// The second indexed point (first real bucket) must have corrected time, not raw time.
// With correctedTime(a2) = 1.92 s and bucket = 0.5 s, it should fall in the 1.5 s bucket.
if (index.Points.Count > 1)
{
var secondPoint = index.Points[1];
var corrected = index.PresentationTimeSeconds(secondPoint);
Assert.That(corrected, Is.GreaterThan(0.0),
"Non-first indexed points must have positive corrected presentation times");
Assert.That(secondPoint.RawTimeSeconds, Is.GreaterThan(corrected),
"Raw time must be greater than corrected time when pre-skip > 0");
}
}
[Test]
public void Walk_SeekToZero_ResolvesToFirstAudioPageOffset_WithNonZeroPreSkip()
{
// This is the AC9 / Critical-2 regression test: a seek to t=0 must resolve to the first audio
// page's byte offset, not produce "no entry found". With the old code (no t=0 anchor and no
// pre-skip correction), the first indexed point had correctedTime ≈ 0.92 s (for preSkip=3840),
// so a binary search for t=0 would find no entry with time ≤ 0 and fail.
const ushort preSkip = 3840;
var head = OggPage(granule: 0, OpusHeadPacket(preSkip: preSkip));
var tags = OggPage(granule: 0, OpusTagsPacket());
var a1 = OggPage(granule: 48000, AudioPacket(64));
var a2 = OggPage(granule: 96000, AudioPacket(64));
var stream = Concat(head, tags, a1, a2);
var firstAudioByteOffset = (ulong)(head.Length + tags.Length);
var walk = OggOpusParser.Walk(stream);
Assert.That(walk, Is.Not.Null);
var index = walk!.SeekIndex;
var firstPoint = index.Points[0];
// Simulate the binary search: find the largest entry with PresentationTimeSeconds ≤ 0.
// With the fix, the first point has corrected time = 0.0, so it IS found.
Assert.That(index.PresentationTimeSeconds(firstPoint), Is.EqualTo(0.0).Within(1e-12),
"First point corrected time must be exactly 0.0 so binary search for t=0 resolves it");
Assert.That(firstPoint.ByteOffset, Is.EqualTo(firstAudioByteOffset),
"The t=0 anchor must point at the first audio page's byte offset, not the stream start");
}
[Test]
public void Walk_EverySeekOffset_LandsOnARealPageBoundary()
{
// Ten audio pages, ~0.25 s each (12000 samples). Bucketing at 0.5 s means roughly every other
// page is indexed; every indexed offset must still be the start of some OggS page.
var head = OggPage(granule: 0, OpusHeadPacket());
var tags = OggPage(granule: 0, OpusTagsPacket());
var pages = new List<byte[]> { head, tags };
ulong granule = 0;
for (var i = 0; i < 10; i++)
{
granule += 12000; // 0.25 s at 48 kHz
pages.Add(OggPage(granule, AudioPacket(50 + i)));
}
var stream = Concat(pages.ToArray());
var pageOffsets = CollectPageStartOffsets(stream);
var walk = OggOpusParser.Walk(stream);
Assert.That(walk, Is.Not.Null);
foreach (var point in walk!.SeekIndex.Points)
{
Assert.That(pageOffsets, Does.Contain(point.ByteOffset),
$"Seek offset {point.ByteOffset} must be a real OggS page start");
}
}
[Test]
public void Walk_Bucketing_EmitsRoughlyOneEntryPerHalfSecond()
{
// Twenty audio pages of 0.25 s each = 5 s total (zero pre-skip). At 0.5 s buckets:
// first point (anchored at t=0) + one per 0.5 s boundary = 1 + 10 = 11 entries expected.
var head = OggPage(granule: 0, OpusHeadPacket());
var tags = OggPage(granule: 0, OpusTagsPacket());
var pages = new List<byte[]> { head, tags };
ulong granule = 0;
for (var i = 0; i < 20; i++)
{
granule += 12000; // 0.25 s
pages.Add(OggPage(granule, AudioPacket(40)));
}
var stream = Concat(pages.ToArray());
var walk = OggOpusParser.Walk(stream);
Assert.That(walk, Is.Not.Null);
// 5 s of audio with 0.5 s buckets: 1 anchor + 10 bucket crossings = 11 entries.
// Accept 1012 for floating-point boundary tolerance, but must be far below 20 (one per page).
Assert.That(walk!.SeekIndex.Points.Count, Is.InRange(10, 12),
"Bucketing must coalesce ~0.25 s pages into ~0.5 s index entries, not one per page");
}
[Test]
public void Walk_PointsAreStrictlyAscending_InBothGranuleAndOffset()
{
var head = OggPage(granule: 0, OpusHeadPacket());
var tags = OggPage(granule: 0, OpusTagsPacket());
var pages = new List<byte[]> { head, tags };
ulong granule = 0;
for (var i = 0; i < 12; i++)
{
granule += 24000; // 0.5 s — one index entry per page
pages.Add(OggPage(granule, AudioPacket(30)));
}
var walk = OggOpusParser.Walk(Concat(pages.ToArray()));
Assert.That(walk, Is.Not.Null);
var points = walk!.SeekIndex.Points;
for (var i = 1; i < points.Count; i++)
{
Assert.That(points[i].GranulePosition, Is.GreaterThan(points[i - 1].GranulePosition));
Assert.That(points[i].ByteOffset, Is.GreaterThan(points[i - 1].ByteOffset));
}
}
[Test]
public void Walk_ClampValues_ReflectPreSkipCorrectedDurationAndTotalByteLength()
{
const ushort preSkip = 312;
var head = OggPage(granule: 0, OpusHeadPacket(preSkip: preSkip));
var tags = OggPage(granule: 0, OpusTagsPacket());
var a1 = OggPage(granule: 48000, AudioPacket(64)); // 1.0 s raw; ~0.9935 s corrected
var a2 = OggPage(granule: 144000, AudioPacket(64)); // 3.0 s raw; ~2.9935 s corrected (final)
var stream = Concat(head, tags, a1, a2);
var walk = OggOpusParser.Walk(stream);
Assert.That(walk, Is.Not.Null);
Assert.That(walk!.SeekIndex.TotalByteLength, Is.EqualTo((ulong)stream.Length),
"Total byte length must equal the full stream length for end-of-stream clamping");
var expectedDuration = (144000.0 - preSkip) / 48000.0;
Assert.That(walk.SeekIndex.TotalDurationSeconds, Is.EqualTo(expectedDuration).Within(1e-9),
"TotalDurationSeconds must be pre-skip-corrected: (lastGranule - preSkip) / 48000");
Assert.That(walk.SeekIndex.PreSkip, Is.EqualTo(preSkip),
"PreSkip must round-trip through the seek index");
}
[Test]
public void Walk_MalformedStream_ReturnsNull_RatherThanThrowing()
{
var notOgg = Encoding.ASCII.GetBytes("this is not an ogg stream at all");
Assert.That(OggOpusParser.Walk(notOgg), Is.Null);
// OpusHead present but no audio pages → no seek points → null (nothing to index).
var headOnly = Concat(OggPage(0, OpusHeadPacket()), OggPage(0, OpusTagsPacket()));
Assert.That(OggOpusParser.Walk(headOnly), Is.Null);
}
[Test]
public void SeekIndex_RoundTrips_ThroughBinaryEncoding()
{
var points = new[]
{
new OpusSeekPoint(312, 200), // first point anchored at preSkip
new OpusSeekPoint(72000, 512),
new OpusSeekPoint(96000, 900),
};
var index = new OggOpusSeekIndex(points, TotalDurationSeconds: 2.0, TotalByteLength: 1024,
PreSkip: 312);
var restored = OggOpusSeekIndex.FromBytes(index.ToBytes());
Assert.That(restored, Is.Not.Null);
Assert.That(restored!.TotalByteLength, Is.EqualTo(1024UL));
Assert.That(restored.TotalDurationSeconds, Is.EqualTo(2.0));
Assert.That(restored.PreSkip, Is.EqualTo((ushort)312),
"PreSkip must survive the binary round-trip");
Assert.That(restored.Points, Is.EqualTo(points));
}
[Test]
public void SeekIndex_PresentationTimeSeconds_SubtractsPreSkip()
{
const ushort preSkip = 3840;
var point = new OpusSeekPoint(GranulePosition: 48000, ByteOffset: 200);
var index = new OggOpusSeekIndex(
new[] { point }, TotalDurationSeconds: 0.92, TotalByteLength: 500, PreSkip: preSkip);
var corrected = index.PresentationTimeSeconds(point);
var expected = (48000.0 - preSkip) / 48000.0; // ≈ 0.92 s
Assert.That(corrected, Is.EqualTo(expected).Within(1e-9),
"PresentationTimeSeconds must return (granule - preSkip) / 48000, not raw granule / 48000");
}
[Test]
public void SeekIndex_PresentationTimeSeconds_ClampsToZeroForFirstAnchorPoint()
{
const ushort preSkip = 3840;
// First anchor point: granule stored as preSkip, so corrected time = 0.
var firstPoint = new OpusSeekPoint(GranulePosition: preSkip, ByteOffset: 150);
var index = new OggOpusSeekIndex(
new[] { firstPoint }, TotalDurationSeconds: 2.0, TotalByteLength: 500, PreSkip: preSkip);
Assert.That(index.PresentationTimeSeconds(firstPoint), Is.EqualTo(0.0).Within(1e-12),
"The t=0 anchor point (granule == preSkip) must yield corrected time = 0.0 exactly");
}
[Test]
public void Sidecar_RoundTrips_PreservingSetupHeaderAndIndex()
{
var setup = Encoding.ASCII.GetBytes("OpusHead-and-OpusTags-bytes-go-here");
var index = new OggOpusSeekIndex(
new[] { new OpusSeekPoint(312, 200), new OpusSeekPoint(96000, 700) },
TotalDurationSeconds: 2.0, TotalByteLength: 800, PreSkip: 312);
var sidecar = new OpusSidecar(setup, index);
var restored = OpusSidecar.FromBytes(sidecar.ToBytes());
Assert.That(restored, Is.Not.Null);
Assert.That(restored!.SetupHeaderBytes, Is.EqualTo(setup),
"The sidecar must preserve the setup header so the client can prepend it to mid-stream slices");
Assert.That(restored.SeekIndex.Points, Is.EqualTo(index.Points));
Assert.That(restored.SeekIndex.TotalByteLength, Is.EqualTo(800UL));
Assert.That(restored.SeekIndex.PreSkip, Is.EqualTo((ushort)312),
"PreSkip must survive the sidecar binary round-trip");
}
[Test]
public void Sidecar_FromBytes_RejectsTruncatedBlob()
{
Assert.That(OpusSidecar.FromBytes(new byte[2]), Is.Null, "A blob shorter than the length prefix is corruption");
// A length prefix that overruns the buffer must be rejected, not over-read.
var bad = new byte[8];
BinaryPrimitives.WriteUInt32LittleEndian(bad, 9999);
Assert.That(OpusSidecar.FromBytes(bad), Is.Null);
}
// ---- Ogg stream construction helpers (minimal, single-packet pages) ----
// Builds one Ogg page wrapping a single packet payload with the given end granule position. The page
// header layout matches the spec the parser reads: capture "OggS", version, header-type, granulepos,
// serial, sequence, checksum (zeroed — the parser does not verify CRC), page-segments, segment table.
private static byte[] OggPage(ulong granule, byte[] packet)
{
// Lacing: a packet of length L is split into 255-byte segments plus a final < 255 segment.
var segments = new List<byte>();
var remaining = packet.Length;
while (remaining >= 255)
{
segments.Add(255);
remaining -= 255;
}
segments.Add((byte)remaining);
var header = new byte[OggOpusConstants.OggPageHeaderSize + segments.Count];
OggOpusConstants.CapturePattern.CopyTo(header);
header[4] = 0; // version
header[5] = 0; // header-type flags
BinaryPrimitives.WriteUInt64LittleEndian(header.AsSpan(OggOpusConstants.GranulePositionOffset, 8), granule);
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(14, 4), 0xDEAD); // serial
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(18, 4), 0); // sequence (unused by parser)
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(22, 4), 0); // checksum (unverified)
header[OggOpusConstants.PageSegmentCountOffset] = (byte)segments.Count;
for (var i = 0; i < segments.Count; i++)
header[OggOpusConstants.OggPageHeaderSize + i] = segments[i];
return Concat(header, packet);
}
private static byte[] OpusHeadPacket(ushort preSkip = 0)
{
// "OpusHead" + RFC 7845 §5.1 identification header:
// [0] version = 1, [1] channel count = 2,
// [2-3] pre_skip (little-endian uint16), [4-7] input sample rate = 0xBB80 = 48000,
// [8-9] output gain = 0, [10] channel mapping family = 0.
var tail = new byte[11];
tail[0] = 1; // version
tail[1] = 2; // channels
BinaryPrimitives.WriteUInt16LittleEndian(tail.AsSpan(2, 2), preSkip); // pre_skip
BinaryPrimitives.WriteUInt32LittleEndian(tail.AsSpan(4, 4), 48000); // input sample rate
tail[10] = 0; // channel mapping family
return Concat(OggOpusConstants.OpusHeadSignature.ToArray(), tail);
}
private static byte[] OpusTagsPacket()
{
// "OpusTags" + a tiny vendor string region (length-prefixed) + zero user comments.
var vendor = Encoding.ASCII.GetBytes("test");
var packet = new List<byte>();
packet.AddRange(OggOpusConstants.OpusTagsSignature.ToArray());
packet.AddRange(BitConverter.GetBytes((uint)vendor.Length));
packet.AddRange(vendor);
packet.AddRange(BitConverter.GetBytes(0u)); // user comment count
return packet.ToArray();
}
private static byte[] AudioPacket(int size)
{
var packet = new byte[size];
for (var i = 0; i < size; i++)
packet[i] = (byte)(i & 0xFF);
return packet;
}
private static List<ulong> CollectPageStartOffsets(byte[] stream)
{
var offsets = new List<ulong>();
var span = stream.AsSpan();
var offset = 0;
while (offset + OggOpusConstants.OggPageHeaderSize <= span.Length)
{
var page = span.Slice(offset);
if (!page[..4].SequenceEqual(OggOpusConstants.CapturePattern))
break;
var segmentCount = page[OggOpusConstants.PageSegmentCountOffset];
var payload = 0;
for (var i = 0; i < segmentCount; i++)
payload += page[OggOpusConstants.OggPageHeaderSize + i];
offsets.Add((ulong)offset);
offset += OggOpusConstants.OggPageHeaderSize + segmentCount + payload;
}
return offsets;
}
private static byte[] Concat(params byte[][] parts)
{
var total = parts.Sum(p => p.Length);
var result = new byte[total];
var cursor = 0;
foreach (var part in parts)
{
part.CopyTo(result, cursor);
cursor += part.Length;
}
return result;
}
}
@@ -77,6 +77,7 @@ public class UploadDuplicateDetectionTests
return new UnifiedTrackService( return new UnifiedTrackService(
content, sqlTrackService, fileDatabase!, waveforms, content, sqlTrackService, fileDatabase!, waveforms,
new NoOpOpusTranscodeQueue(),
NullLogger<UnifiedTrackService>.Instance); NullLogger<UnifiedTrackService>.Instance);
} }