Merge Phase 21.2 (streaming back-pressure) into streaming-overhaul
This commit is contained in:
@@ -159,6 +159,25 @@ public class AudioInteropService : IAsyncDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Phase 21.2a back-pressure poll: ask whether the scheduler is still over its forward
|
||||||
|
/// high/low-water band. The read loop calls this only WHILE already throttled, to learn when it
|
||||||
|
/// may resume reading — the steady-state loop reads the piggybacked <c>ProductionPaused</c> flag
|
||||||
|
/// off each chunk result instead. Defaults to false on any interop failure so a torn-down player
|
||||||
|
/// never wedges a loop that is exiting anyway.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<bool> IsProductionPaused(string playerId)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return await _jsRuntime.InvokeAsync<bool>("DeepDrftAudio.isProductionPaused", playerId);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<AudioOperationResult> ReinitializeFromOffset(string playerId, long totalStreamLength, double seekPosition)
|
public async Task<AudioOperationResult> ReinitializeFromOffset(string playerId, long totalStreamLength, double seekPosition)
|
||||||
{
|
{
|
||||||
return await InvokeJsAsync<AudioOperationResult>("DeepDrftAudio.reinitializeFromOffset", playerId, totalStreamLength, seekPosition);
|
return await InvokeJsAsync<AudioOperationResult>("DeepDrftAudio.reinitializeFromOffset", playerId, totalStreamLength, seekPosition);
|
||||||
@@ -419,6 +438,11 @@ public class StreamingResult : AudioOperationResult
|
|||||||
public bool HeaderParsed { get; set; }
|
public bool HeaderParsed { get; set; }
|
||||||
public int BufferCount { get; set; }
|
public int BufferCount { get; set; }
|
||||||
public double? Duration { get; set; } // Duration in seconds calculated from WAV header
|
public double? Duration { get; set; } // Duration in seconds calculated from WAV header
|
||||||
|
|
||||||
|
// Phase 21.2a back-pressure: true when the scheduler's forward decoded fill is over the
|
||||||
|
// high-water mark and the C# read loop should stop calling ReadAsync until it drains. Read off
|
||||||
|
// the chunk result the loop already awaits — no extra interop hop in the unthrottled steady state.
|
||||||
|
public bool ProductionPaused { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
public class AudioPlayerState
|
public class AudioPlayerState
|
||||||
|
|||||||
@@ -16,6 +16,14 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
// Adaptive chunk sizing
|
// Adaptive chunk sizing
|
||||||
private const int MinBufferSize = 16 * 1024; // 16KB minimum
|
private const int MinBufferSize = 16 * 1024; // 16KB minimum
|
||||||
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
|
private const int MaxBufferSize = 64 * 1024; // 64KB maximum
|
||||||
|
|
||||||
|
// Phase 21.2a back-pressure poll interval. While the scheduler is over its forward high-water
|
||||||
|
// mark, the read loop stops calling ReadAsync and polls IsProductionPaused at this cadence
|
||||||
|
// until the fill drains below low-water. 100 ms is well under the low-water lookahead (seconds),
|
||||||
|
// so resume is prompt relative to the playhead — no starvation (AC3) — while keeping the poll
|
||||||
|
// cheap. The poll honors the loop's cancellation token, so a track switch/seek during a pause
|
||||||
|
// exits through the same drain discipline as a pause during ReadAsync (C6).
|
||||||
|
private const int BackpressurePollMs = 100;
|
||||||
private int _currentBufferSize = DefaultBufferSize;
|
private int _currentBufferSize = DefaultBufferSize;
|
||||||
private int _consecutiveSlowReads = 0;
|
private int _consecutiveSlowReads = 0;
|
||||||
|
|
||||||
@@ -465,6 +473,31 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
}
|
}
|
||||||
|
|
||||||
await ThrottledNotifyStateChanged();
|
await ThrottledNotifyStateChanged();
|
||||||
|
|
||||||
|
// Phase 21.2a back-pressure (serves BOTH paths). The chunk we just processed
|
||||||
|
// reported the scheduler's forward fill is over the high-water mark — stop
|
||||||
|
// reading the socket so the unplayed decoded region stays bounded. Pausing
|
||||||
|
// ReadAsync lets the kernel TCP window close (we are working WITH transport flow
|
||||||
|
// control, not against it). Poll until the fill drains below low-water, then
|
||||||
|
// resume the loop. For WAV this is the whole story (StreamDecoder decodes
|
||||||
|
// synchronously into the scheduler); the Opus feed additionally self-throttles
|
||||||
|
// its demux/decode off the SAME signal (21.2b), so its upstream queues stay
|
||||||
|
// near-empty too. The poll awaits on cancellationToken, so a track switch/seek
|
||||||
|
// mid-pause throws OCE and unwinds through the existing drain discipline (C6) —
|
||||||
|
// no separate cancellation path, no stale read racing a reinit.
|
||||||
|
if (chunkResult.ProductionPaused)
|
||||||
|
{
|
||||||
|
// UC5: while the user is paused, the playhead is frozen so forward lookahead
|
||||||
|
// never shrinks and the poll would spin indefinitely. Wait here until playback
|
||||||
|
// resumes (IsPaused clears) OR the fill drains on its own. Cancellation is
|
||||||
|
// unchanged: a track switch/seek/stop while paused still throws OCE and unwinds
|
||||||
|
// through the existing drain discipline (C6) — no weakening of the cancel path.
|
||||||
|
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
await Task.Delay(BackpressurePollMs, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} while (currentBytes > 0);
|
} while (currentBytes > 0);
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,10 @@ export interface StreamingResult extends AudioResult {
|
|||||||
headerParsed?: boolean;
|
headerParsed?: boolean;
|
||||||
bufferCount?: number;
|
bufferCount?: number;
|
||||||
duration?: number;
|
duration?: number;
|
||||||
|
// Phase 21.2a back-pressure signal piggybacked on the chunk result the C# read loop already
|
||||||
|
// awaits — true means the scheduler's forward fill is over the high-water mark and the loop
|
||||||
|
// should stop calling ReadAsync until it drains (no extra interop hop in the common case).
|
||||||
|
productionPaused?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface AudioState {
|
export interface AudioState {
|
||||||
@@ -133,7 +137,14 @@ export class AudioPlayer {
|
|||||||
// selects Opus when the sidecar parsed, so the null branch is defensive.
|
// selects Opus when the sidecar parsed, so the null branch is defensive.
|
||||||
if (this.isOpusContentType(contentType) && this.pendingOpusSidecar) {
|
if (this.isOpusContentType(contentType) && this.pendingOpusSidecar) {
|
||||||
this.activeOpusSidecar = this.pendingOpusSidecar;
|
this.activeOpusSidecar = this.pendingOpusSidecar;
|
||||||
this.opusDecoder = new OpusStreamDecoder(this.contextManager, this.pendingOpusSidecar);
|
// Pass the shared back-pressure signal (21.2b): the Opus decoder stops demuxing/
|
||||||
|
// decoding new packets while the scheduler is full, so the WebCodecs decode queue
|
||||||
|
// and decodedQueue do not balloon behind a throttled socket (OQ7). Same signal the
|
||||||
|
// C# read loop honors — one policy, two thin hooks.
|
||||||
|
this.opusDecoder = new OpusStreamDecoder(
|
||||||
|
this.contextManager,
|
||||||
|
this.pendingOpusSidecar,
|
||||||
|
() => this.scheduler.evaluateProductionPause());
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -263,7 +274,8 @@ export class AudioPlayer {
|
|||||||
canStartStreaming: canStart,
|
canStartStreaming: canStart,
|
||||||
headerParsed,
|
headerParsed,
|
||||||
bufferCount: this.scheduler.getBufferCount(),
|
bufferCount: this.scheduler.getBufferCount(),
|
||||||
duration: this.duration
|
duration: this.duration,
|
||||||
|
productionPaused: this.scheduler.evaluateProductionPause()
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return { success: false, error: (error as Error).message };
|
return { success: false, error: (error as Error).message };
|
||||||
@@ -307,7 +319,8 @@ export class AudioPlayer {
|
|||||||
canStartStreaming: canStart,
|
canStartStreaming: canStart,
|
||||||
headerParsed: this.streamDecoder.headerParsed,
|
headerParsed: this.streamDecoder.headerParsed,
|
||||||
bufferCount: this.scheduler.getBufferCount(),
|
bufferCount: this.scheduler.getBufferCount(),
|
||||||
duration: this.duration
|
duration: this.duration,
|
||||||
|
productionPaused: this.scheduler.evaluateProductionPause()
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return { success: false, error: (error as Error).message };
|
return { success: false, error: (error as Error).message };
|
||||||
@@ -508,6 +521,17 @@ export class AudioPlayer {
|
|||||||
return this.scheduler.getTotalDuration() + this.scheduler.getPlaybackOffset();
|
return this.scheduler.getTotalDuration() + this.scheduler.getPlaybackOffset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The shared back-pressure signal (Phase 21.2a), polled by the C# read loop WHILE it is
|
||||||
|
* already throttled to learn when the forward fill has drained below the low-water mark and it
|
||||||
|
* may resume reading. The steady-state (unthrottled) loop never calls this — it reads the
|
||||||
|
* piggybacked productionPaused flag off each chunk result instead, so there is no extra
|
||||||
|
* interop hop until back-pressure actually engages.
|
||||||
|
*/
|
||||||
|
isProductionPaused(): boolean {
|
||||||
|
return this.scheduler.evaluateProductionPause();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculate byte offset for a time position (for C# layer)
|
* Calculate byte offset for a time position (for C# layer)
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -448,6 +448,65 @@ test('OpusStreamDecoder.totalDuration is null when the sidecar carries no positi
|
|||||||
assertEqual(decoder.totalDuration, null, 'no positive duration -> null');
|
assertEqual(decoder.totalDuration, null, 'no positive duration -> null');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// --- Phase 21.2b: Opus decode-ahead back-pressure (the stash-while-full half) ------------------
|
||||||
|
//
|
||||||
|
// When the shared scheduler is full, push() must NOT demux/decode ahead — it stashes the raw bytes
|
||||||
|
// and returns nothing, so the WebCodecs decode queue and decodedQueue stay near-empty (OQ7). The
|
||||||
|
// stash-while-full branch returns BEFORE ensureConfigured(), so it is testable without WebCodecs
|
||||||
|
// (no AudioDecoder is constructed). The drain-on-resume path needs the real WebCodecs decoder and
|
||||||
|
// stays browser-verified; here we pin the bound itself and the lifecycle resets.
|
||||||
|
|
||||||
|
// Access the private stash for white-box assertions (same idiom the scheduler tests use).
|
||||||
|
function stashLength(decoder: OpusStreamDecoder): number {
|
||||||
|
return (decoder as unknown as { pendingBytes: Uint8Array[] }).pendingBytes.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The stash-while-full branch returns synchronously at the top of push() (before any real await),
|
||||||
|
// so the stash is observable immediately without awaiting the returned promise — keeping these
|
||||||
|
// tests inside the synchronous inline harness (which does not await test bodies).
|
||||||
|
test('push stashes bytes and decodes nothing while the scheduler is full (no decode-ahead)', () => {
|
||||||
|
const sidecar = sidecarFrom({
|
||||||
|
setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64],
|
||||||
|
totalByteLength: 500_000, totalDuration: 100, preSkip: 312,
|
||||||
|
points: [{ granule: 312, byteOffset: 4096 }],
|
||||||
|
});
|
||||||
|
// Scheduler reports "full" → push must short-circuit before touching WebCodecs.
|
||||||
|
const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true);
|
||||||
|
|
||||||
|
void decoder.push(new Uint8Array([1, 2, 3]));
|
||||||
|
void decoder.push(new Uint8Array([4, 5]));
|
||||||
|
|
||||||
|
assertEqual(stashLength(decoder), 2, 'both chunks stashed in arrival order');
|
||||||
|
assertEqual(decoder.ready, false, 'decoder not even configured while throttled');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('reinitializeForRangeContinuation drops the pre-seek stash (C6 — no stale feed across reset)', () => {
|
||||||
|
const sidecar = sidecarFrom({
|
||||||
|
setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64],
|
||||||
|
totalByteLength: 500_000, totalDuration: 100, preSkip: 312,
|
||||||
|
points: [{ granule: 312, byteOffset: 4096 }],
|
||||||
|
});
|
||||||
|
const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true);
|
||||||
|
void decoder.push(new Uint8Array([1, 2, 3])); // stash one chunk while full
|
||||||
|
assertEqual(stashLength(decoder), 1, 'one chunk stashed pre-seek');
|
||||||
|
|
||||||
|
decoder.reinitializeForRangeContinuation(0, 5); // a seek
|
||||||
|
assertEqual(stashLength(decoder), 0, 'pre-seek stash dropped on range-continuation');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('dispose clears the stash', () => {
|
||||||
|
const sidecar = sidecarFrom({
|
||||||
|
setupHeader: [0x4f, 0x70, 0x75, 0x73, 0x48, 0x65, 0x61, 0x64],
|
||||||
|
totalByteLength: 500_000, totalDuration: 100, preSkip: 312,
|
||||||
|
points: [{ granule: 312, byteOffset: 4096 }],
|
||||||
|
});
|
||||||
|
const decoder = new OpusStreamDecoder(stubContextManager, sidecar, () => true);
|
||||||
|
void decoder.push(new Uint8Array([9]));
|
||||||
|
assertEqual(stashLength(decoder), 1, 'stashed');
|
||||||
|
decoder.dispose();
|
||||||
|
assertEqual(stashLength(decoder), 0, 'stash cleared on dispose');
|
||||||
|
});
|
||||||
|
|
||||||
function concat(arrs: Uint8Array[]): Uint8Array {
|
function concat(arrs: Uint8Array[]): Uint8Array {
|
||||||
let len = 0;
|
let len = 0;
|
||||||
for (const a of arrs) len += a.length;
|
for (const a of arrs) len += a.length;
|
||||||
|
|||||||
@@ -39,6 +39,17 @@ const MAX_PACKET_FRAMES = 5760;
|
|||||||
export class OpusStreamDecoder implements IStreamingDecoder {
|
export class OpusStreamDecoder implements IStreamingDecoder {
|
||||||
private readonly contextManager: AudioContextManager;
|
private readonly contextManager: AudioContextManager;
|
||||||
private readonly sidecar: OpusSeekData;
|
private readonly sidecar: OpusSeekData;
|
||||||
|
// Phase 21.2b back-pressure hook: returns true when the shared scheduler is full (forward fill
|
||||||
|
// over high-water). While full, push() stashes raw bytes WITHOUT demuxing/decoding so the
|
||||||
|
// WebCodecs decode queue and decodedQueue stay near-empty behind a throttled socket (OQ7).
|
||||||
|
// Null = no back-pressure (e.g. unit tests), in which case the decoder feeds eagerly as before.
|
||||||
|
private readonly isSchedulerFull: (() => boolean) | null;
|
||||||
|
|
||||||
|
// Raw bytes received while the scheduler was full, held undemuxed until it drains. The C# read
|
||||||
|
// loop also pauses above high-water, so this stash is bounded to at most the in-flight chunks
|
||||||
|
// between the loop reading the productionPaused flag and actually stopping — a handful of KB,
|
||||||
|
// not a decode-ahead. Drained (demuxed + decoded) on the next push once below high-water.
|
||||||
|
private pendingBytes: Uint8Array[] = [];
|
||||||
|
|
||||||
private demuxer = new OggDemuxer();
|
private demuxer = new OggDemuxer();
|
||||||
private decoder: AudioDecoder | null = null;
|
private decoder: AudioDecoder | null = null;
|
||||||
@@ -66,9 +77,13 @@ export class OpusStreamDecoder implements IStreamingDecoder {
|
|||||||
private emittedFrames = 0;
|
private emittedFrames = 0;
|
||||||
private readonly totalFrames: number;
|
private readonly totalFrames: number;
|
||||||
|
|
||||||
constructor(contextManager: AudioContextManager, sidecar: OpusSeekData) {
|
constructor(
|
||||||
|
contextManager: AudioContextManager,
|
||||||
|
sidecar: OpusSeekData,
|
||||||
|
isSchedulerFull: (() => boolean) | null = null) {
|
||||||
this.contextManager = contextManager;
|
this.contextManager = contextManager;
|
||||||
this.sidecar = sidecar;
|
this.sidecar = sidecar;
|
||||||
|
this.isSchedulerFull = isSchedulerFull;
|
||||||
this.totalFrames = sidecar.totalDurationSeconds > 0
|
this.totalFrames = sidecar.totalDurationSeconds > 0
|
||||||
? Math.round(sidecar.totalDurationSeconds * OPUS_SAMPLE_RATE)
|
? Math.round(sidecar.totalDurationSeconds * OPUS_SAMPLE_RATE)
|
||||||
: Number.POSITIVE_INFINITY;
|
: Number.POSITIVE_INFINITY;
|
||||||
@@ -136,17 +151,65 @@ export class OpusStreamDecoder implements IStreamingDecoder {
|
|||||||
|
|
||||||
async push(chunk: Uint8Array): Promise<AudioBuffer[]> {
|
async push(chunk: Uint8Array): Promise<AudioBuffer[]> {
|
||||||
if (this.fatalError) return [];
|
if (this.fatalError) return [];
|
||||||
|
|
||||||
|
// 21.2b back-pressure: while the scheduler is full, do NOT demux/decode ahead. Stash the
|
||||||
|
// raw bytes in arrival order and return nothing — the WebCodecs decode queue and
|
||||||
|
// decodedQueue stay near-empty (OQ7). The bytes are demuxed/decoded on a later push once
|
||||||
|
// the scheduler has drained below low-water, in exactly the order received (Ogg demux is
|
||||||
|
// order-sensitive). configure() is deferred too — no need to spin up the decoder while
|
||||||
|
// throttled. The C# loop also stops reading above high-water, so the stash stays small.
|
||||||
|
if (this.isSchedulerFull?.()) {
|
||||||
|
this.pendingBytes.push(chunk);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
if (!(await this.ensureConfigured())) return [];
|
if (!(await this.ensureConfigured())) return [];
|
||||||
|
|
||||||
const packets = this.demuxer.push(chunk);
|
// Drained below high-water: replay any stashed bytes first (preserving stream order), then
|
||||||
this.decodePackets(packets);
|
// the new chunk, through the demuxer as one contiguous feed.
|
||||||
|
const out: AudioBuffer[] = [];
|
||||||
|
if (this.pendingBytes.length > 0) {
|
||||||
|
const stashed = this.pendingBytes;
|
||||||
|
this.pendingBytes = [];
|
||||||
|
for (const bytes of stashed) {
|
||||||
|
this.decodePackets(this.demuxer.push(bytes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.decodePackets(this.demuxer.push(chunk));
|
||||||
// Wait until the WebCodecs decoder has processed the queued packets before draining.
|
// Wait until the WebCodecs decoder has processed the queued packets before draining.
|
||||||
await this.yieldToDecoder();
|
await this.yieldToDecoder();
|
||||||
return this.drainDecoded();
|
out.push(...this.drainDecoded());
|
||||||
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
async complete(): Promise<AudioBuffer[]> {
|
async complete(): Promise<AudioBuffer[]> {
|
||||||
if (this.fatalError || !this.decoder || this.decoder.state !== 'configured') {
|
if (this.fatalError) {
|
||||||
|
return this.drainDecoded();
|
||||||
|
}
|
||||||
|
|
||||||
|
// End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track
|
||||||
|
// that finished sending before the scheduler drained). Configure if needed and replay the
|
||||||
|
// stash so the tail is decoded before flush — otherwise the final seconds would be lost.
|
||||||
|
//
|
||||||
|
// OQ7/AC1-Opus precision note: the stash here is drained in full without a water-mark
|
||||||
|
// check. This is intentionally correct: the stream has ended — you cannot back-pressure a
|
||||||
|
// finished stream — and the remainder is tail-only (bounded by whatever the throttled C#
|
||||||
|
// loop left in flight, which is at most one push() worth of bytes). Adding a water-mark
|
||||||
|
// gate to complete() would silently drop the track's tail and is therefore wrong.
|
||||||
|
if (this.pendingBytes.length > 0) {
|
||||||
|
if (await this.ensureConfigured()) {
|
||||||
|
const stashed = this.pendingBytes;
|
||||||
|
this.pendingBytes = [];
|
||||||
|
for (const bytes of stashed) {
|
||||||
|
this.decodePackets(this.demuxer.push(bytes));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.pendingBytes = [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.decoder || this.decoder.state !== 'configured') {
|
||||||
return this.drainDecoded();
|
return this.drainDecoded();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
@@ -184,6 +247,10 @@ export class OpusStreamDecoder implements IStreamingDecoder {
|
|||||||
// continuation mode (treat the first page's packets as audio — no setup pages in a 206 body).
|
// continuation mode (treat the first page's packets as audio — no setup pages in a 206 body).
|
||||||
this.demuxer.reset(true);
|
this.demuxer.reset(true);
|
||||||
this.decodedQueue = [];
|
this.decodedQueue = [];
|
||||||
|
// Drop any bytes stashed by back-pressure: they belong to the PRE-seek stream position and
|
||||||
|
// must never be replayed against the post-seek (range-continuation) demux state (C6 — no
|
||||||
|
// stale feed racing the reset).
|
||||||
|
this.pendingBytes = [];
|
||||||
this.emittedFrames = 0; // post-seek buffers are positioned by the scheduler's playbackOffset
|
this.emittedFrames = 0; // post-seek buffers are positioned by the scheduler's playbackOffset
|
||||||
// Arm the lead trim: skip enough decoded frames to land at targetTimeSeconds, not at
|
// Arm the lead trim: skip enough decoded frames to land at targetTimeSeconds, not at
|
||||||
// landingTimeSeconds (the page start). Clamp to ≥0 to guard against floating-point rounding.
|
// landingTimeSeconds (the page start). Clamp to ≥0 to guard against floating-point rounding.
|
||||||
@@ -199,6 +266,7 @@ export class OpusStreamDecoder implements IStreamingDecoder {
|
|||||||
try { d.close(); } catch { /* already closed */ }
|
try { d.close(); } catch { /* already closed */ }
|
||||||
}
|
}
|
||||||
this.decodedQueue = [];
|
this.decodedQueue = [];
|
||||||
|
this.pendingBytes = [];
|
||||||
if (this.decoder && this.decoder.state !== 'closed') {
|
if (this.decoder && this.decoder.state !== 'closed') {
|
||||||
try { this.decoder.close(); } catch { /* already closed */ }
|
try { this.decoder.close(); } catch { /* already closed */ }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,6 +85,14 @@ function buf(duration: number): AudioBuffer {
|
|||||||
return { duration } as AudioBuffer;
|
return { duration } as AudioBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A decoded buffer carrying realistic byte-footprint fields (length + numberOfChannels) for the
|
||||||
|
* OQ3 byte-ceiling test. Models 48 kHz stereo float PCM: length = duration × 48000 frames, 2 ch.
|
||||||
|
*/
|
||||||
|
function bufBytes(duration: number): AudioBuffer {
|
||||||
|
return { duration, length: Math.round(duration * 48000), numberOfChannels: 2 } as AudioBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
function makeScheduler(cm: FakeContextManager): PlaybackScheduler {
|
function makeScheduler(cm: FakeContextManager): PlaybackScheduler {
|
||||||
// The scheduler only uses the subset FakeContextManager implements.
|
// The scheduler only uses the subset FakeContextManager implements.
|
||||||
return new PlaybackScheduler(cm as unknown as AudioContextManager);
|
return new PlaybackScheduler(cm as unknown as AudioContextManager);
|
||||||
@@ -325,6 +333,114 @@ test('eviction via handleSourceEnded: position exact, live bufferIndex decrement
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// === Phase 21.2 back-pressure: the forward water-mark signal =================================
|
||||||
|
//
|
||||||
|
// The signal is pure given the clock + buffer durations + the playhead position, so it is
|
||||||
|
// testable in Node with the same fakes. We drive forward lookahead by adding buffers (fill) and
|
||||||
|
// advancing the clock (drain), and assert the hysteresis latch and the OQ3 byte ceiling.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fill the scheduler with `count` 1 s buffers, start playback at t=0, and advance the schedule
|
||||||
|
* cursor to the end so nextBufferIndex does not pin anything. Leaves all `count` buffers decoded
|
||||||
|
* and the playhead at the clock position the caller sets afterwards.
|
||||||
|
*/
|
||||||
|
function fillAndStart(s: PlaybackScheduler, cm: FakeContextManager, count: number): void {
|
||||||
|
for (let i = 0; i < count; i++) s.addBuffer(buf(1));
|
||||||
|
cm.now = 0;
|
||||||
|
s.playFromPosition(0);
|
||||||
|
advanceCursorToEnd(s, cm);
|
||||||
|
}
|
||||||
|
|
||||||
|
// High-water reached → production pauses; the signal reflects the forward lookahead.
|
||||||
|
test('evaluateProductionPause latches true when forward lookahead reaches high-water', () => {
|
||||||
|
const cm = new FakeContextManager();
|
||||||
|
const s = makeScheduler(cm);
|
||||||
|
s.setForwardWindow(10, 5, 0); // high 10s, low 5s, byte cap disabled
|
||||||
|
fillAndStart(s, cm, 40); // 40s decoded, track [0,40)
|
||||||
|
|
||||||
|
cm.now = 0; // playhead at 0 → forward lookahead = 40s ≥ 10s high-water
|
||||||
|
assertEqual(s.getForwardLookaheadSeconds(), 40, 'lookahead is full decoded tail at t=0');
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 'pauses at/above high-water');
|
||||||
|
});
|
||||||
|
|
||||||
|
// Below high-water but above low-water while NOT yet paused → stays unpaused (no premature pause).
|
||||||
|
test('evaluateProductionPause stays false in the hysteresis band before the high-water crossing', () => {
|
||||||
|
const cm = new FakeContextManager();
|
||||||
|
const s = makeScheduler(cm);
|
||||||
|
s.setForwardWindow(10, 5, 0);
|
||||||
|
fillAndStart(s, cm, 8); // 8s decoded
|
||||||
|
|
||||||
|
cm.now = 0; // lookahead 8s: between low(5) and high(10), never latched → unpaused
|
||||||
|
assertEqual(s.evaluateProductionPause(), false, 'no pause until high-water is actually reached');
|
||||||
|
});
|
||||||
|
|
||||||
|
// Hysteresis: once paused at high-water, stays paused through the band until lookahead drains
|
||||||
|
// below low-water, then resumes. Drain is modeled by advancing the clock (playhead moves forward,
|
||||||
|
// shrinking forward lookahead).
|
||||||
|
test('evaluateProductionPause holds through the band and resumes only below low-water', () => {
|
||||||
|
const cm = new FakeContextManager();
|
||||||
|
const s = makeScheduler(cm);
|
||||||
|
s.setForwardWindow(10, 5, 0);
|
||||||
|
fillAndStart(s, cm, 40); // track [0,40)
|
||||||
|
|
||||||
|
cm.now = 0;
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 'latched at high-water (40s ahead)');
|
||||||
|
|
||||||
|
// Playhead at 32 → lookahead 8s: in the band (5..10) → must STAY paused (hysteresis).
|
||||||
|
cm.now = 32;
|
||||||
|
assertEqual(s.getForwardLookaheadSeconds(), 8, 'lookahead drained to 8s');
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 'still paused inside the band');
|
||||||
|
|
||||||
|
// Playhead at 36 → lookahead 4s ≤ low-water 5 → resume.
|
||||||
|
cm.now = 36;
|
||||||
|
assertEqual(s.getForwardLookaheadSeconds(), 4, 'lookahead below low-water');
|
||||||
|
assertEqual(s.evaluateProductionPause(), false, 'resumes below low-water');
|
||||||
|
|
||||||
|
// Refill back over high-water re-latches (the next chunk would re-pause).
|
||||||
|
for (let i = 0; i < 20; i++) s.addBuffer(buf(1)); // +20s decoded ahead
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 're-latches when fill exceeds high-water again');
|
||||||
|
});
|
||||||
|
|
||||||
|
// OQ3 hard byte ceiling pauses production independent of the time window, and releases as soon as
|
||||||
|
// the footprint is back under the cap (no separate low-water band on the byte guard).
|
||||||
|
test('OQ3 byte ceiling pauses regardless of the time window', () => {
|
||||||
|
const cm = new FakeContextManager();
|
||||||
|
const s = makeScheduler(cm);
|
||||||
|
// Each 1s buffer here is 48000 frames × 2 ch × 4 bytes = 384000 bytes. Cap at ~1.5 MB ≈ 4 buffers.
|
||||||
|
const perBuffer = 48000 * 2 * 4;
|
||||||
|
s.setForwardWindow(1000, 500, perBuffer * 4); // time window huge so only the byte cap can fire
|
||||||
|
for (let i = 0; i < 6; i++) s.addBuffer(bufBytes(1)); // 6 buffers > 4-buffer cap
|
||||||
|
cm.now = 0;
|
||||||
|
s.playFromPosition(0);
|
||||||
|
advanceCursorToEnd(s, cm);
|
||||||
|
|
||||||
|
cm.now = 0;
|
||||||
|
if (s.getDecodedByteEstimate() <= perBuffer * 4) {
|
||||||
|
throw new Error('test setup: byte estimate should exceed the cap');
|
||||||
|
}
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 'byte ceiling pauses even with a huge time window');
|
||||||
|
});
|
||||||
|
|
||||||
|
// clear() / clearForSeek() release the latch so a fresh stream/seek starts unthrottled (C2).
|
||||||
|
test('clear and clearForSeek release the back-pressure latch (C2 latency parity)', () => {
|
||||||
|
const cm = new FakeContextManager();
|
||||||
|
const s = makeScheduler(cm);
|
||||||
|
s.setForwardWindow(10, 5, 0);
|
||||||
|
fillAndStart(s, cm, 40);
|
||||||
|
cm.now = 0;
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 'latched');
|
||||||
|
|
||||||
|
s.clear();
|
||||||
|
// After clear there are no buffers, lookahead is 0, and the latch is reset → unpaused.
|
||||||
|
assertEqual(s.evaluateProductionPause(), false, 'clear resets the latch and empties fill');
|
||||||
|
|
||||||
|
fillAndStart(s, cm, 40);
|
||||||
|
cm.now = 0;
|
||||||
|
assertEqual(s.evaluateProductionPause(), true, 'latched again after refill');
|
||||||
|
s.clearForSeek();
|
||||||
|
assertEqual(s.evaluateProductionPause(), false, 'clearForSeek resets the latch');
|
||||||
|
});
|
||||||
|
|
||||||
// --- run -------------------------------------------------------------------------------------
|
// --- run -------------------------------------------------------------------------------------
|
||||||
if (failures.length > 0) {
|
if (failures.length > 0) {
|
||||||
console.error(failures.join('\n'));
|
console.error(failures.join('\n'));
|
||||||
|
|||||||
@@ -36,6 +36,33 @@ import { AudioContextManager } from './AudioContextManager.js';
|
|||||||
*/
|
*/
|
||||||
const DEFAULT_BACK_RETAIN_SECONDS = 10;
|
const DEFAULT_BACK_RETAIN_SECONDS = 10;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forward back-pressure water-marks (Phase 21.2 — the bound on the *unplayed* region).
|
||||||
|
*
|
||||||
|
* The single back-pressure signal is the scheduler's decoded forward lookahead: how many
|
||||||
|
* seconds of decoded audio sit AHEAD of the playhead (OQ7). Production (the C# read loop and,
|
||||||
|
* for Opus, the demux/decode feed) pauses above the high-water mark and resumes below the
|
||||||
|
* low-water mark — classic hysteresis so the two producers do not chatter on/off per chunk.
|
||||||
|
*
|
||||||
|
* Provisional time-based defaults (OQ1 — 21.4 tunes them):
|
||||||
|
* - HIGH (30 s): the most decoded lookahead we hold ahead of the playhead before throttling.
|
||||||
|
* Comfortably above the playback-start minimum (`AudioPlayer.minBuffersForPlayback = 6`
|
||||||
|
* buffers, each typically 0.06 – 1 s depending on format/chunk size; at most a few seconds
|
||||||
|
* even at the high end), so C2 holds — first audio never waits on a throttle (the high-water
|
||||||
|
* is reached only well after playback is already running).
|
||||||
|
* - LOW (15 s): resume producing here. Kept generous so the forward fill never drains to the
|
||||||
|
* ~500 ms scheduler lookahead under normal network jitter (AC3 — no starvation).
|
||||||
|
*
|
||||||
|
* OQ3 hard memory ceiling: an absolute byte cap on total decoded float held, independent of the
|
||||||
|
* time window. This is the guard-rail that makes "1 GB never OOMs" a guarantee rather than a
|
||||||
|
* tuning hope — if a pathological stream packs an unusual amount of decoded audio into the time
|
||||||
|
* window, the byte cap still pauses production. Estimated as channels × frames × 4 bytes (f32).
|
||||||
|
*/
|
||||||
|
const DEFAULT_FORWARD_HIGH_WATER_SECONDS = 30;
|
||||||
|
const DEFAULT_FORWARD_LOW_WATER_SECONDS = 15;
|
||||||
|
const DEFAULT_MAX_DECODED_BYTES = 96 * 1024 * 1024; // ~96 MB of decoded float PCM
|
||||||
|
const BYTES_PER_FLOAT_SAMPLE = 4;
|
||||||
|
|
||||||
interface ScheduledSource {
|
interface ScheduledSource {
|
||||||
source: AudioBufferSourceNode;
|
source: AudioBufferSourceNode;
|
||||||
bufferIndex: number;
|
bufferIndex: number;
|
||||||
@@ -66,6 +93,20 @@ export class PlaybackScheduler {
|
|||||||
// 21.2 will drive this from the window policy. Not a hardcoded eviction decision.
|
// 21.2 will drive this from the window policy. Not a hardcoded eviction decision.
|
||||||
private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS;
|
private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS;
|
||||||
|
|
||||||
|
// Forward back-pressure water-marks + the OQ3 hard byte ceiling (Phase 21.2). This is the
|
||||||
|
// single shared window policy (OQ6): both producers call evaluateProductionPause() and honor it
|
||||||
|
// in their own way — the C# read loop stops ReadAsync, the Opus feed stops demux/decode.
|
||||||
|
private forwardHighWaterSeconds: number = DEFAULT_FORWARD_HIGH_WATER_SECONDS;
|
||||||
|
private forwardLowWaterSeconds: number = DEFAULT_FORWARD_LOW_WATER_SECONDS;
|
||||||
|
private maxDecodedBytes: number = DEFAULT_MAX_DECODED_BYTES;
|
||||||
|
|
||||||
|
// Hysteresis latch for the production pause. Once forward fill crosses the high-water mark we
|
||||||
|
// stay paused until it drains below the low-water mark, so the two producers do not flap
|
||||||
|
// on/off around a single threshold (and a paused producer does not resume for one chunk only
|
||||||
|
// to re-pause immediately). False until first crossing; flips on the band edges.
|
||||||
|
// Mutated by evaluateProductionPause() — named to signal the state-advance on each call.
|
||||||
|
private productionPaused_: boolean = false;
|
||||||
|
|
||||||
// Callbacks
|
// Callbacks
|
||||||
public onPlaybackEnded: (() => void) | null = null;
|
public onPlaybackEnded: (() => void) | null = null;
|
||||||
|
|
||||||
@@ -132,6 +173,73 @@ export class PlaybackScheduler {
|
|||||||
this.backRetainSeconds = Math.max(0, seconds);
|
this.backRetainSeconds = Math.max(0, seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure the forward back-pressure water-marks (seconds of decoded lookahead) and the OQ3
|
||||||
|
* hard byte ceiling. Provisional config seam — 21.4 tunes the numbers. Low is clamped below
|
||||||
|
* high so the hysteresis band is always valid; non-positive byte cap disables the OQ3 guard.
|
||||||
|
*/
|
||||||
|
setForwardWindow(highWaterSeconds: number, lowWaterSeconds: number, maxDecodedBytes: number): void {
|
||||||
|
this.forwardHighWaterSeconds = Math.max(0, highWaterSeconds);
|
||||||
|
this.forwardLowWaterSeconds = Math.max(0, Math.min(lowWaterSeconds, this.forwardHighWaterSeconds));
|
||||||
|
this.maxDecodedBytes = maxDecodedBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Seconds of decoded audio sitting AHEAD of the current playhead — the forward fill. This is
|
||||||
|
* the single back-pressure signal (OQ7): the absolute end time of the last decoded buffer
|
||||||
|
* minus the current playback position. Never negative (clamped at 0 when the playhead has
|
||||||
|
* caught up to or passed the decoded tail).
|
||||||
|
*/
|
||||||
|
getForwardLookaheadSeconds(): number {
|
||||||
|
const decodedEnd = this.getTotalDuration() + this.playbackOffset;
|
||||||
|
return Math.max(0, decodedEnd - this.getCurrentPosition());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Estimated bytes of decoded float PCM currently retained (OQ3 input). Web Audio AudioBuffers
|
||||||
|
* are 32-bit float per sample per channel; frames = duration × sampleRate. Summed across the
|
||||||
|
* retained buffers only — evicted buffers are already reclaimed, so this tracks the live
|
||||||
|
* footprint, not the whole track.
|
||||||
|
*/
|
||||||
|
getDecodedByteEstimate(): number {
|
||||||
|
let bytes = 0;
|
||||||
|
for (const b of this.buffers) {
|
||||||
|
bytes += b.length * b.numberOfChannels * BYTES_PER_FLOAT_SAMPLE;
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The single shared production-pause decision (Phase 21.2, OQ6/OQ7). Both producers — the C#
|
||||||
|
* read loop (21.2a) and the Opus demux/decode feed (21.2b) — call this and stop producing
|
||||||
|
* while it returns true. Hysteresis: pause when forward lookahead exceeds the high-water mark
|
||||||
|
* OR the decoded byte estimate exceeds the OQ3 ceiling; resume only once forward lookahead has
|
||||||
|
* drained below the low-water mark AND the byte estimate is back under the ceiling. The
|
||||||
|
* byte-ceiling test has no separate low-water band — it is the hard guard rail, so it releases
|
||||||
|
* as soon as eviction brings the footprint back under the cap.
|
||||||
|
*
|
||||||
|
* Named `evaluateProductionPause` (not `isProductionPaused`) because each call may ADVANCE the
|
||||||
|
* hysteresis latch (`productionPaused_`), making it a state-advancing evaluation, not a pure
|
||||||
|
* read. `AudioPlayer.isProductionPaused()` is the pure-predicate wrapper exposed to callers
|
||||||
|
* outside the scheduler.
|
||||||
|
*/
|
||||||
|
evaluateProductionPause(): boolean {
|
||||||
|
const lookahead = this.getForwardLookaheadSeconds();
|
||||||
|
const overByteCeiling = this.maxDecodedBytes > 0 && this.getDecodedByteEstimate() > this.maxDecodedBytes;
|
||||||
|
|
||||||
|
if (this.productionPaused_) {
|
||||||
|
// Stay paused until BOTH the time window has drained below low-water AND the byte
|
||||||
|
// footprint is back under the ceiling.
|
||||||
|
if (lookahead <= this.forwardLowWaterSeconds && !overByteCeiling) {
|
||||||
|
this.productionPaused_ = false;
|
||||||
|
}
|
||||||
|
} else if (lookahead >= this.forwardHighWaterSeconds || overByteCeiling) {
|
||||||
|
this.productionPaused_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.productionPaused_;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drop already-played buffers from the front of the array, reclaiming their decoded float
|
* Drop already-played buffers from the front of the array, reclaiming their decoded float
|
||||||
* memory, and advance the time anchor so all position/index bookkeeping stays exact.
|
* memory, and advance the time anchor so all position/index bookkeeping stays exact.
|
||||||
@@ -418,6 +526,9 @@ export class PlaybackScheduler {
|
|||||||
this.nextBufferIndex = 0;
|
this.nextBufferIndex = 0;
|
||||||
this.nextScheduleTime = 0;
|
this.nextScheduleTime = 0;
|
||||||
this.playbackOffset = 0;
|
this.playbackOffset = 0;
|
||||||
|
// Release the back-pressure latch — a fresh stream must start unthrottled so its first
|
||||||
|
// chunks decode immediately (C2: no throttle-induced first-audio stall).
|
||||||
|
this.productionPaused_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -432,6 +543,9 @@ export class PlaybackScheduler {
|
|||||||
this.nextBufferIndex = 0;
|
this.nextBufferIndex = 0;
|
||||||
this.nextScheduleTime = 0;
|
this.nextScheduleTime = 0;
|
||||||
// Note: playbackOffset is NOT reset - it will be set by the caller
|
// Note: playbackOffset is NOT reset - it will be set by the caller
|
||||||
|
// Release the back-pressure latch — the post-seek continuation must refill from the new
|
||||||
|
// offset without inheriting the pre-seek paused state.
|
||||||
|
this.productionPaused_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -117,6 +117,14 @@ const DeepDrftAudio = {
|
|||||||
return player?.calculateByteOffset(positionSeconds) ?? 0;
|
return player?.calculateByteOffset(positionSeconds) ?? 0;
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// Phase 21.2a back-pressure poll: the C# read loop calls this WHILE throttled to learn when
|
||||||
|
// the scheduler has drained below low-water and reading may resume. A missing player reads as
|
||||||
|
// "not paused" so a torn-down player never wedges a loop that is already exiting.
|
||||||
|
isProductionPaused: (playerId: string): boolean => {
|
||||||
|
const player = audioPlayers.get(playerId);
|
||||||
|
return player?.isProductionPaused() ?? false;
|
||||||
|
},
|
||||||
|
|
||||||
reinitializeFromOffset: (playerId: string, totalStreamLength: number, seekPosition: number): AudioResult => {
|
reinitializeFromOffset: (playerId: string, totalStreamLength: number, seekPosition: number): AudioResult => {
|
||||||
const player = audioPlayers.get(playerId);
|
const player = audioPlayers.get(playerId);
|
||||||
if (!player) return { success: false, error: 'Player not found' };
|
if (!player) return { success: false, error: 'Player not found' };
|
||||||
|
|||||||
Reference in New Issue
Block a user