Files
deepdrft/DeepDrftAPI/Services/Opus/OpusTranscodeBackgroundService.cs
T
daniel-c-harvey 33d6f34d8a feature: Phase 18.1 — derive Opus 320 + seek-index sidecar at ingest
Background-job transcode (ffmpeg/libopus) after source store; pure C# Ogg
walker builds the 0.5s-bucketed granule→byte seek index + captures the
OpusHead/OpusTags setup header into a per-track sidecar in a new track-opus
vault. Best-effort, additive, regenerated on replace-audio.
2026-06-23 06:30:10 -04:00

73 lines
3.0 KiB
C#

using System.Threading.Channels;
using DeepDrftContent.Processors.Opus;
namespace DeepDrftAPI.Services.Opus;
/// <summary>
/// The background worker behind <see cref="IOpusTranscodeQueue"/> (OQ6 / §3.1a). An unbounded in-process
/// channel buffers EntryKeys enqueued by the upload and replace-audio paths; a single hosted loop drains
/// them one at a time and runs <see cref="OpusTranscodeService.TranscodeAndStoreAsync"/> for each. Serial
/// by design — a transcode is CPU-heavy (§3.1), so running them concurrently would starve request
/// handling; one-at-a-time keeps the derive strictly off the hot path without saturating the host.
///
/// This worker IS the queue (implements <see cref="IOpusTranscodeQueue"/>) so enqueue and drain share one
/// channel with no extra indirection. It is registered as a singleton and surfaced under both the
/// interface and <see cref="IHostedService"/>.
/// </summary>
public sealed class OpusTranscodeBackgroundService : BackgroundService, IOpusTranscodeQueue
{
private readonly Channel<string> _channel =
Channel.CreateUnbounded<string>(new UnboundedChannelOptions { SingleReader = true });
private readonly OpusTranscodeService _transcodeService;
private readonly ILogger<OpusTranscodeBackgroundService> _logger;
public OpusTranscodeBackgroundService(
OpusTranscodeService transcodeService,
ILogger<OpusTranscodeBackgroundService> logger)
{
_transcodeService = transcodeService;
_logger = logger;
}
public void Enqueue(string entryKey)
{
if (string.IsNullOrWhiteSpace(entryKey))
return;
if (!_channel.Writer.TryWrite(entryKey))
{
// Unbounded writer only rejects after Complete(), i.e. during shutdown. The track stays
// lossless-only and is eligible for backfill, so a dropped enqueue is non-fatal — log it.
_logger.LogWarning("Opus transcode: could not enqueue {EntryKey} (queue closed).", entryKey);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var entryKey in _channel.Reader.ReadAllAsync(stoppingToken))
{
try
{
await _transcodeService.TranscodeAndStoreAsync(entryKey, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break; // host shutting down
}
catch (Exception ex)
{
// TranscodeAndStoreAsync already swallows expected failures; this guards the loop against
// anything unexpected so one bad track never kills the worker.
_logger.LogError(ex, "Opus transcode: unhandled failure draining {EntryKey}; worker continues.", entryKey);
}
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_channel.Writer.TryComplete();
return base.StopAsync(cancellationToken);
}
}