21.2 review remediation: pause-spin, OQ7 comment, rename, C2 cross-check
Skip the back-pressure interop poll while paused (UC5). Document complete() draining the stash in full by design. Rename scheduler isProductionPaused to evaluateProductionPause (latch-advancing); window exposure name unchanged.
This commit is contained in:
@@ -487,7 +487,12 @@ public class StreamingAudioPlayerService : AudioPlayerService, IStreamingPlayerS
|
|||||||
// no separate cancellation path, no stale read racing a reinit.
|
// no separate cancellation path, no stale read racing a reinit.
|
||||||
if (chunkResult.ProductionPaused)
|
if (chunkResult.ProductionPaused)
|
||||||
{
|
{
|
||||||
while (await _audioInterop.IsProductionPaused(PlayerId))
|
// UC5: while the user is paused, the playhead is frozen so forward lookahead
|
||||||
|
// never shrinks and the poll would spin indefinitely. Wait here until playback
|
||||||
|
// resumes (IsPaused clears) OR the fill drains on its own. Cancellation is
|
||||||
|
// unchanged: a track switch/seek/stop while paused still throws OCE and unwinds
|
||||||
|
// through the existing drain discipline (C6) — no weakening of the cancel path.
|
||||||
|
while (IsPaused || await _audioInterop.IsProductionPaused(PlayerId))
|
||||||
{
|
{
|
||||||
cancellationToken.ThrowIfCancellationRequested();
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
await Task.Delay(BackpressurePollMs, cancellationToken);
|
await Task.Delay(BackpressurePollMs, cancellationToken);
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ export class AudioPlayer {
|
|||||||
this.opusDecoder = new OpusStreamDecoder(
|
this.opusDecoder = new OpusStreamDecoder(
|
||||||
this.contextManager,
|
this.contextManager,
|
||||||
this.pendingOpusSidecar,
|
this.pendingOpusSidecar,
|
||||||
() => this.scheduler.isProductionPaused());
|
() => this.scheduler.evaluateProductionPause());
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,7 +275,7 @@ export class AudioPlayer {
|
|||||||
headerParsed,
|
headerParsed,
|
||||||
bufferCount: this.scheduler.getBufferCount(),
|
bufferCount: this.scheduler.getBufferCount(),
|
||||||
duration: this.duration,
|
duration: this.duration,
|
||||||
productionPaused: this.scheduler.isProductionPaused()
|
productionPaused: this.scheduler.evaluateProductionPause()
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return { success: false, error: (error as Error).message };
|
return { success: false, error: (error as Error).message };
|
||||||
@@ -320,7 +320,7 @@ export class AudioPlayer {
|
|||||||
headerParsed: this.streamDecoder.headerParsed,
|
headerParsed: this.streamDecoder.headerParsed,
|
||||||
bufferCount: this.scheduler.getBufferCount(),
|
bufferCount: this.scheduler.getBufferCount(),
|
||||||
duration: this.duration,
|
duration: this.duration,
|
||||||
productionPaused: this.scheduler.isProductionPaused()
|
productionPaused: this.scheduler.evaluateProductionPause()
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return { success: false, error: (error as Error).message };
|
return { success: false, error: (error as Error).message };
|
||||||
@@ -529,7 +529,7 @@ export class AudioPlayer {
|
|||||||
* interop hop until back-pressure actually engages.
|
* interop hop until back-pressure actually engages.
|
||||||
*/
|
*/
|
||||||
isProductionPaused(): boolean {
|
isProductionPaused(): boolean {
|
||||||
return this.scheduler.isProductionPaused();
|
return this.scheduler.evaluateProductionPause();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -191,6 +191,12 @@ export class OpusStreamDecoder implements IStreamingDecoder {
|
|||||||
// End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track
|
// End-of-stream may arrive while still throttled with bytes stashed (e.g. a short track
|
||||||
// that finished sending before the scheduler drained). Configure if needed and replay the
|
// that finished sending before the scheduler drained). Configure if needed and replay the
|
||||||
// stash so the tail is decoded before flush — otherwise the final seconds would be lost.
|
// stash so the tail is decoded before flush — otherwise the final seconds would be lost.
|
||||||
|
//
|
||||||
|
// OQ7/AC1-Opus precision note: the stash here is drained in full without a water-mark
|
||||||
|
// check. This is intentionally correct: the stream has ended — you cannot back-pressure a
|
||||||
|
// finished stream — and the remainder is tail-only (bounded by whatever the throttled C#
|
||||||
|
// loop left in flight, which is at most one push() worth of bytes). Adding a water-mark
|
||||||
|
// gate to complete() would silently drop the track's tail and is therefore wrong.
|
||||||
if (this.pendingBytes.length > 0) {
|
if (this.pendingBytes.length > 0) {
|
||||||
if (await this.ensureConfigured()) {
|
if (await this.ensureConfigured()) {
|
||||||
const stashed = this.pendingBytes;
|
const stashed = this.pendingBytes;
|
||||||
|
|||||||
@@ -352,7 +352,7 @@ function fillAndStart(s: PlaybackScheduler, cm: FakeContextManager, count: numbe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// High-water reached → production pauses; the signal reflects the forward lookahead.
|
// High-water reached → production pauses; the signal reflects the forward lookahead.
|
||||||
test('isProductionPaused latches true when forward lookahead reaches high-water', () => {
|
test('evaluateProductionPause latches true when forward lookahead reaches high-water', () => {
|
||||||
const cm = new FakeContextManager();
|
const cm = new FakeContextManager();
|
||||||
const s = makeScheduler(cm);
|
const s = makeScheduler(cm);
|
||||||
s.setForwardWindow(10, 5, 0); // high 10s, low 5s, byte cap disabled
|
s.setForwardWindow(10, 5, 0); // high 10s, low 5s, byte cap disabled
|
||||||
@@ -360,45 +360,45 @@ test('isProductionPaused latches true when forward lookahead reaches high-water'
|
|||||||
|
|
||||||
cm.now = 0; // playhead at 0 → forward lookahead = 40s ≥ 10s high-water
|
cm.now = 0; // playhead at 0 → forward lookahead = 40s ≥ 10s high-water
|
||||||
assertEqual(s.getForwardLookaheadSeconds(), 40, 'lookahead is full decoded tail at t=0');
|
assertEqual(s.getForwardLookaheadSeconds(), 40, 'lookahead is full decoded tail at t=0');
|
||||||
assertEqual(s.isProductionPaused(), true, 'pauses at/above high-water');
|
assertEqual(s.evaluateProductionPause(), true, 'pauses at/above high-water');
|
||||||
});
|
});
|
||||||
|
|
||||||
// Below high-water but above low-water while NOT yet paused → stays unpaused (no premature pause).
|
// Below high-water but above low-water while NOT yet paused → stays unpaused (no premature pause).
|
||||||
test('isProductionPaused stays false in the hysteresis band before the high-water crossing', () => {
|
test('evaluateProductionPause stays false in the hysteresis band before the high-water crossing', () => {
|
||||||
const cm = new FakeContextManager();
|
const cm = new FakeContextManager();
|
||||||
const s = makeScheduler(cm);
|
const s = makeScheduler(cm);
|
||||||
s.setForwardWindow(10, 5, 0);
|
s.setForwardWindow(10, 5, 0);
|
||||||
fillAndStart(s, cm, 8); // 8s decoded
|
fillAndStart(s, cm, 8); // 8s decoded
|
||||||
|
|
||||||
cm.now = 0; // lookahead 8s: between low(5) and high(10), never latched → unpaused
|
cm.now = 0; // lookahead 8s: between low(5) and high(10), never latched → unpaused
|
||||||
assertEqual(s.isProductionPaused(), false, 'no pause until high-water is actually reached');
|
assertEqual(s.evaluateProductionPause(), false, 'no pause until high-water is actually reached');
|
||||||
});
|
});
|
||||||
|
|
||||||
// Hysteresis: once paused at high-water, stays paused through the band until lookahead drains
|
// Hysteresis: once paused at high-water, stays paused through the band until lookahead drains
|
||||||
// below low-water, then resumes. Drain is modeled by advancing the clock (playhead moves forward,
|
// below low-water, then resumes. Drain is modeled by advancing the clock (playhead moves forward,
|
||||||
// shrinking forward lookahead).
|
// shrinking forward lookahead).
|
||||||
test('isProductionPaused holds through the band and resumes only below low-water', () => {
|
test('evaluateProductionPause holds through the band and resumes only below low-water', () => {
|
||||||
const cm = new FakeContextManager();
|
const cm = new FakeContextManager();
|
||||||
const s = makeScheduler(cm);
|
const s = makeScheduler(cm);
|
||||||
s.setForwardWindow(10, 5, 0);
|
s.setForwardWindow(10, 5, 0);
|
||||||
fillAndStart(s, cm, 40); // track [0,40)
|
fillAndStart(s, cm, 40); // track [0,40)
|
||||||
|
|
||||||
cm.now = 0;
|
cm.now = 0;
|
||||||
assertEqual(s.isProductionPaused(), true, 'latched at high-water (40s ahead)');
|
assertEqual(s.evaluateProductionPause(), true, 'latched at high-water (40s ahead)');
|
||||||
|
|
||||||
// Playhead at 32 → lookahead 8s: in the band (5..10) → must STAY paused (hysteresis).
|
// Playhead at 32 → lookahead 8s: in the band (5..10) → must STAY paused (hysteresis).
|
||||||
cm.now = 32;
|
cm.now = 32;
|
||||||
assertEqual(s.getForwardLookaheadSeconds(), 8, 'lookahead drained to 8s');
|
assertEqual(s.getForwardLookaheadSeconds(), 8, 'lookahead drained to 8s');
|
||||||
assertEqual(s.isProductionPaused(), true, 'still paused inside the band');
|
assertEqual(s.evaluateProductionPause(), true, 'still paused inside the band');
|
||||||
|
|
||||||
// Playhead at 36 → lookahead 4s ≤ low-water 5 → resume.
|
// Playhead at 36 → lookahead 4s ≤ low-water 5 → resume.
|
||||||
cm.now = 36;
|
cm.now = 36;
|
||||||
assertEqual(s.getForwardLookaheadSeconds(), 4, 'lookahead below low-water');
|
assertEqual(s.getForwardLookaheadSeconds(), 4, 'lookahead below low-water');
|
||||||
assertEqual(s.isProductionPaused(), false, 'resumes below low-water');
|
assertEqual(s.evaluateProductionPause(), false, 'resumes below low-water');
|
||||||
|
|
||||||
// Refill back over high-water re-latches (the next chunk would re-pause).
|
// Refill back over high-water re-latches (the next chunk would re-pause).
|
||||||
for (let i = 0; i < 20; i++) s.addBuffer(buf(1)); // +20s decoded ahead
|
for (let i = 0; i < 20; i++) s.addBuffer(buf(1)); // +20s decoded ahead
|
||||||
assertEqual(s.isProductionPaused(), true, 're-latches when fill exceeds high-water again');
|
assertEqual(s.evaluateProductionPause(), true, 're-latches when fill exceeds high-water again');
|
||||||
});
|
});
|
||||||
|
|
||||||
// OQ3 hard byte ceiling pauses production independent of the time window, and releases as soon as
|
// OQ3 hard byte ceiling pauses production independent of the time window, and releases as soon as
|
||||||
@@ -418,7 +418,7 @@ test('OQ3 byte ceiling pauses regardless of the time window', () => {
|
|||||||
if (s.getDecodedByteEstimate() <= perBuffer * 4) {
|
if (s.getDecodedByteEstimate() <= perBuffer * 4) {
|
||||||
throw new Error('test setup: byte estimate should exceed the cap');
|
throw new Error('test setup: byte estimate should exceed the cap');
|
||||||
}
|
}
|
||||||
assertEqual(s.isProductionPaused(), true, 'byte ceiling pauses even with a huge time window');
|
assertEqual(s.evaluateProductionPause(), true, 'byte ceiling pauses even with a huge time window');
|
||||||
});
|
});
|
||||||
|
|
||||||
// clear() / clearForSeek() release the latch so a fresh stream/seek starts unthrottled (C2).
|
// clear() / clearForSeek() release the latch so a fresh stream/seek starts unthrottled (C2).
|
||||||
@@ -428,17 +428,17 @@ test('clear and clearForSeek release the back-pressure latch (C2 latency parity)
|
|||||||
s.setForwardWindow(10, 5, 0);
|
s.setForwardWindow(10, 5, 0);
|
||||||
fillAndStart(s, cm, 40);
|
fillAndStart(s, cm, 40);
|
||||||
cm.now = 0;
|
cm.now = 0;
|
||||||
assertEqual(s.isProductionPaused(), true, 'latched');
|
assertEqual(s.evaluateProductionPause(), true, 'latched');
|
||||||
|
|
||||||
s.clear();
|
s.clear();
|
||||||
// After clear there are no buffers, lookahead is 0, and the latch is reset → unpaused.
|
// After clear there are no buffers, lookahead is 0, and the latch is reset → unpaused.
|
||||||
assertEqual(s.isProductionPaused(), false, 'clear resets the latch and empties fill');
|
assertEqual(s.evaluateProductionPause(), false, 'clear resets the latch and empties fill');
|
||||||
|
|
||||||
fillAndStart(s, cm, 40);
|
fillAndStart(s, cm, 40);
|
||||||
cm.now = 0;
|
cm.now = 0;
|
||||||
assertEqual(s.isProductionPaused(), true, 'latched again after refill');
|
assertEqual(s.evaluateProductionPause(), true, 'latched again after refill');
|
||||||
s.clearForSeek();
|
s.clearForSeek();
|
||||||
assertEqual(s.isProductionPaused(), false, 'clearForSeek resets the latch');
|
assertEqual(s.evaluateProductionPause(), false, 'clearForSeek resets the latch');
|
||||||
});
|
});
|
||||||
|
|
||||||
// --- run -------------------------------------------------------------------------------------
|
// --- run -------------------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -46,9 +46,10 @@ const DEFAULT_BACK_RETAIN_SECONDS = 10;
|
|||||||
*
|
*
|
||||||
* Provisional time-based defaults (OQ1 — 21.4 tunes them):
|
* Provisional time-based defaults (OQ1 — 21.4 tunes them):
|
||||||
* - HIGH (30 s): the most decoded lookahead we hold ahead of the playhead before throttling.
|
* - HIGH (30 s): the most decoded lookahead we hold ahead of the playhead before throttling.
|
||||||
* Comfortably above the playback-start minimum (6 buffers ≈ a second or two), so C2 holds —
|
* Comfortably above the playback-start minimum (`AudioPlayer.minBuffersForPlayback = 6`
|
||||||
* first audio never waits on a throttle (the high-water is reached only well after playback
|
* buffers, each typically 0.06 – 1 s depending on format/chunk size; at most a few seconds
|
||||||
* is already running).
|
* even at the high end), so C2 holds — first audio never waits on a throttle (the high-water
|
||||||
|
* is reached only well after playback is already running).
|
||||||
* - LOW (15 s): resume producing here. Kept generous so the forward fill never drains to the
|
* - LOW (15 s): resume producing here. Kept generous so the forward fill never drains to the
|
||||||
* ~500 ms scheduler lookahead under normal network jitter (AC3 — no starvation).
|
* ~500 ms scheduler lookahead under normal network jitter (AC3 — no starvation).
|
||||||
*
|
*
|
||||||
@@ -93,7 +94,7 @@ export class PlaybackScheduler {
|
|||||||
private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS;
|
private backRetainSeconds: number = DEFAULT_BACK_RETAIN_SECONDS;
|
||||||
|
|
||||||
// Forward back-pressure water-marks + the OQ3 hard byte ceiling (Phase 21.2). This is the
|
// Forward back-pressure water-marks + the OQ3 hard byte ceiling (Phase 21.2). This is the
|
||||||
// single shared window policy (OQ6): both producers read isProductionPaused() and honor it
|
// single shared window policy (OQ6): both producers call evaluateProductionPause() and honor it
|
||||||
// in their own way — the C# read loop stops ReadAsync, the Opus feed stops demux/decode.
|
// in their own way — the C# read loop stops ReadAsync, the Opus feed stops demux/decode.
|
||||||
private forwardHighWaterSeconds: number = DEFAULT_FORWARD_HIGH_WATER_SECONDS;
|
private forwardHighWaterSeconds: number = DEFAULT_FORWARD_HIGH_WATER_SECONDS;
|
||||||
private forwardLowWaterSeconds: number = DEFAULT_FORWARD_LOW_WATER_SECONDS;
|
private forwardLowWaterSeconds: number = DEFAULT_FORWARD_LOW_WATER_SECONDS;
|
||||||
@@ -103,6 +104,7 @@ export class PlaybackScheduler {
|
|||||||
// stay paused until it drains below the low-water mark, so the two producers do not flap
|
// stay paused until it drains below the low-water mark, so the two producers do not flap
|
||||||
// on/off around a single threshold (and a paused producer does not resume for one chunk only
|
// on/off around a single threshold (and a paused producer does not resume for one chunk only
|
||||||
// to re-pause immediately). False until first crossing; flips on the band edges.
|
// to re-pause immediately). False until first crossing; flips on the band edges.
|
||||||
|
// Mutated by evaluateProductionPause() — named to signal the state-advance on each call.
|
||||||
private productionPaused_: boolean = false;
|
private productionPaused_: boolean = false;
|
||||||
|
|
||||||
// Callbacks
|
// Callbacks
|
||||||
@@ -215,8 +217,13 @@ export class PlaybackScheduler {
|
|||||||
* drained below the low-water mark AND the byte estimate is back under the ceiling. The
|
* drained below the low-water mark AND the byte estimate is back under the ceiling. The
|
||||||
* byte-ceiling test has no separate low-water band — it is the hard guard rail, so it releases
|
* byte-ceiling test has no separate low-water band — it is the hard guard rail, so it releases
|
||||||
* as soon as eviction brings the footprint back under the cap.
|
* as soon as eviction brings the footprint back under the cap.
|
||||||
|
*
|
||||||
|
* Named `evaluateProductionPause` (not `isProductionPaused`) because each call may ADVANCE the
|
||||||
|
* hysteresis latch (`productionPaused_`), making it a state-advancing evaluation, not a pure
|
||||||
|
* read. `AudioPlayer.isProductionPaused()` is the pure-predicate wrapper exposed to callers
|
||||||
|
* outside the scheduler.
|
||||||
*/
|
*/
|
||||||
isProductionPaused(): boolean {
|
evaluateProductionPause(): boolean {
|
||||||
const lookahead = this.getForwardLookaheadSeconds();
|
const lookahead = this.getForwardLookaheadSeconds();
|
||||||
const overByteCeiling = this.maxDecodedBytes > 0 && this.getDecodedByteEstimate() > this.maxDecodedBytes;
|
const overByteCeiling = this.maxDecodedBytes > 0 && this.getDecodedByteEstimate() > this.maxDecodedBytes;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user