33d6f34d8a
Background-job transcode (ffmpeg/libopus) after source store; pure C# Ogg walker builds the 0.5s-bucketed granule→byte seek index + captures the OpusHead/OpusTags setup header into a per-track sidecar in a new track-opus vault. Best-effort, additive, regenerated on replace-audio.
73 lines
3.0 KiB
C#
73 lines
3.0 KiB
C#
using System.Threading.Channels;
|
|
using DeepDrftContent.Processors.Opus;
|
|
|
|
namespace DeepDrftAPI.Services.Opus;
|
|
|
|
/// <summary>
|
|
/// The background worker behind <see cref="IOpusTranscodeQueue"/> (OQ6 / §3.1a). An unbounded in-process
|
|
/// channel buffers EntryKeys enqueued by the upload and replace-audio paths; a single hosted loop drains
|
|
/// them one at a time and runs <see cref="OpusTranscodeService.TranscodeAndStoreAsync"/> for each. Serial
|
|
/// by design — a transcode is CPU-heavy (§3.1), so running them concurrently would starve request
|
|
/// handling; one-at-a-time keeps the derive strictly off the hot path without saturating the host.
|
|
///
|
|
/// This worker IS the queue (implements <see cref="IOpusTranscodeQueue"/>) so enqueue and drain share one
|
|
/// channel with no extra indirection. It is registered as a singleton and surfaced under both the
|
|
/// interface and <see cref="IHostedService"/>.
|
|
/// </summary>
|
|
public sealed class OpusTranscodeBackgroundService : BackgroundService, IOpusTranscodeQueue
|
|
{
|
|
private readonly Channel<string> _channel =
|
|
Channel.CreateUnbounded<string>(new UnboundedChannelOptions { SingleReader = true });
|
|
|
|
private readonly OpusTranscodeService _transcodeService;
|
|
private readonly ILogger<OpusTranscodeBackgroundService> _logger;
|
|
|
|
public OpusTranscodeBackgroundService(
|
|
OpusTranscodeService transcodeService,
|
|
ILogger<OpusTranscodeBackgroundService> logger)
|
|
{
|
|
_transcodeService = transcodeService;
|
|
_logger = logger;
|
|
}
|
|
|
|
public void Enqueue(string entryKey)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(entryKey))
|
|
return;
|
|
|
|
if (!_channel.Writer.TryWrite(entryKey))
|
|
{
|
|
// Unbounded writer only rejects after Complete(), i.e. during shutdown. The track stays
|
|
// lossless-only and is eligible for backfill, so a dropped enqueue is non-fatal — log it.
|
|
_logger.LogWarning("Opus transcode: could not enqueue {EntryKey} (queue closed).", entryKey);
|
|
}
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
await foreach (var entryKey in _channel.Reader.ReadAllAsync(stoppingToken))
|
|
{
|
|
try
|
|
{
|
|
await _transcodeService.TranscodeAndStoreAsync(entryKey, stoppingToken);
|
|
}
|
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
{
|
|
break; // host shutting down
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// TranscodeAndStoreAsync already swallows expected failures; this guards the loop against
|
|
// anything unexpected so one bad track never kills the worker.
|
|
_logger.LogError(ex, "Opus transcode: unhandled failure draining {EntryKey}; worker continues.", entryKey);
|
|
}
|
|
}
|
|
}
|
|
|
|
public override Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
_channel.Writer.TryComplete();
|
|
return base.StopAsync(cancellationToken);
|
|
}
|
|
}
|