feature: Phase 18.1 — derive Opus 320 + seek-index sidecar at ingest
Background-job transcode (ffmpeg/libopus) after source store; pure C# Ogg walker builds the 0.5s-bucketed granule→byte seek index + captures the OpusHead/OpusTags setup header into a per-track sidecar in a new track-opus vault. Best-effort, additive, regenerated on replace-audio.
This commit is contained in:
@@ -0,0 +1,72 @@
|
||||
using System.Threading.Channels;
|
||||
using DeepDrftContent.Processors.Opus;
|
||||
|
||||
namespace DeepDrftAPI.Services.Opus;
|
||||
|
||||
/// <summary>
|
||||
/// The background worker behind <see cref="IOpusTranscodeQueue"/> (OQ6 / §3.1a). An unbounded in-process
|
||||
/// channel buffers EntryKeys enqueued by the upload and replace-audio paths; a single hosted loop drains
|
||||
/// them one at a time and runs <see cref="OpusTranscodeService.TranscodeAndStoreAsync"/> for each. Serial
|
||||
/// by design — a transcode is CPU-heavy (§3.1), so running them concurrently would starve request
|
||||
/// handling; one-at-a-time keeps the derive strictly off the hot path without saturating the host.
|
||||
///
|
||||
/// This worker IS the queue (implements <see cref="IOpusTranscodeQueue"/>) so enqueue and drain share one
|
||||
/// channel with no extra indirection. It is registered as a singleton and surfaced under both the
|
||||
/// interface and <see cref="IHostedService"/>.
|
||||
/// </summary>
|
||||
public sealed class OpusTranscodeBackgroundService : BackgroundService, IOpusTranscodeQueue
|
||||
{
|
||||
private readonly Channel<string> _channel =
|
||||
Channel.CreateUnbounded<string>(new UnboundedChannelOptions { SingleReader = true });
|
||||
|
||||
private readonly OpusTranscodeService _transcodeService;
|
||||
private readonly ILogger<OpusTranscodeBackgroundService> _logger;
|
||||
|
||||
public OpusTranscodeBackgroundService(
|
||||
OpusTranscodeService transcodeService,
|
||||
ILogger<OpusTranscodeBackgroundService> logger)
|
||||
{
|
||||
_transcodeService = transcodeService;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public void Enqueue(string entryKey)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(entryKey))
|
||||
return;
|
||||
|
||||
if (!_channel.Writer.TryWrite(entryKey))
|
||||
{
|
||||
// Unbounded writer only rejects after Complete(), i.e. during shutdown. The track stays
|
||||
// lossless-only and is eligible for backfill, so a dropped enqueue is non-fatal — log it.
|
||||
_logger.LogWarning("Opus transcode: could not enqueue {EntryKey} (queue closed).", entryKey);
|
||||
}
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await foreach (var entryKey in _channel.Reader.ReadAllAsync(stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
await _transcodeService.TranscodeAndStoreAsync(entryKey, stoppingToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break; // host shutting down
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// TranscodeAndStoreAsync already swallows expected failures; this guards the loop against
|
||||
// anything unexpected so one bad track never kills the worker.
|
||||
_logger.LogError(ex, "Opus transcode: unhandled failure draining {EntryKey}; worker continues.", entryKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_channel.Writer.TryComplete();
|
||||
return base.StopAsync(cancellationToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user