diff --git a/.changeset/false-interruption-audio-replay.md b/.changeset/false-interruption-audio-replay.md new file mode 100644 index 000000000..fd9171c1d --- /dev/null +++ b/.changeset/false-interruption-audio-replay.md @@ -0,0 +1,9 @@ +--- +'@livekit/agents': patch +--- + +fix(room_io): stop dropping audio on false interruptions + +`ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`). + +Behavioral change: the room audio output now defaults `queueSizeMs` to 200ms (matching Python), down from the rtc-node `AudioSource` default of 1000ms, to keep the playout queue close to realtime. Bursty TTS providers that previously relied on the larger prebuffer can pass an explicit `queueSizeMs` via `RoomOutputOptions`. diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 24b03ba8d..853debeb5 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -167,6 +167,10 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { playbackFinishedCount: number; playbackFinishedFuture: Future; onPlaybackStarted: (createdAt: number) => void; + options: { queueSizeMs?: number }; + recentFrames: unknown[]; + recentFramesMs: number; + replayFrames: unknown[]; audioSource: { clearQueue: () => void; captureFrame: (frame: CaptureFrameArg) => Promise; @@ -187,6 +191,11 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { output.playbackFinishedCount = 0; output.playbackFinishedFuture = new Future(); output.onPlaybackStarted = vi.fn(); + // Object.create bypasses the constructor's field initializers; mirror them. + output.options = { queueSizeMs: 1000 }; + output.recentFrames = []; + output.recentFramesMs = 0; + output.replayFrames = []; output.audioSource = { clearQueue: vi.fn(), captureFrame: vi.fn(async () => {}) }; return output; }; @@ -220,3 +229,174 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { expect(output.pushedDuration).toBeGreaterThan(0); }); }); + +/** + * Regression tests for the false-interruption audio loss fix. + * + * Before the fix, pause() called clearQueue() which permanently dropped every + * frame already pushed to the native AudioSource queue (up to queueSizeMs). On a + * false interruption (pause then resume) the agent never replayed them, so up to + * ~1s of audio (rtc-node default queue) vanished from both the call and the + * recording. The output now keeps a rolling window of recently pushed frames, + * captures the unplayed tail on pause(), and replays it on the next captureFrame + * after resume() — while discarding it on a real interruption (clearBuffer()). + */ +describe('ParticipantAudioOutput false-interruption replay', () => { + const FRAME_MS = 20; + const SR = 48000; + const SPF = (SR * FRAME_MS) / 1000; + + type ReplayOutput = ParticipantAudioOutput & { + startedFuture: Future; + playbackEnabledFuture: Future; + interruptedFuture: Future; + firstFrameEmitted: boolean; + pushedDuration: number; + _capturing: boolean; + playbackSegmentsCount: number; + playbackFinishedCount: number; + playbackFinishedFuture: Future; + onPlaybackStarted: (createdAt: number) => void; + options: { queueSizeMs?: number }; + recentFrames: unknown[]; + recentFramesMs: number; + replayFrames: unknown[]; + audioSource: { + clearQueue: () => void; + captureFrame: (frame: CaptureFrameArg) => Promise; + queuedDuration: number; + }; + }; + + // Tag each frame by id in data[0] so captured ids reveal exact ordering, lost + // frames (missing id), and replayed frames (duplicate id). + const frameOf = (id: number): CaptureFrameArg => { + const data = new Int16Array(SPF); + data[0] = id; + return { samplesPerChannel: SPF, sampleRate: SR, data } as unknown as CaptureFrameArg; + }; + + const makeOutput = (queueSizeMs: number, captured: number[]): ReplayOutput => { + const output = Object.create(ParticipantAudioOutput.prototype) as ReplayOutput; + output.startedFuture = new Future(); + output.startedFuture.resolve(); + output.playbackEnabledFuture = new Future(); + output.playbackEnabledFuture.resolve(); + output.interruptedFuture = new Future(); + output.firstFrameEmitted = false; + output.pushedDuration = 0; + output._capturing = false; + output.playbackSegmentsCount = 0; + output.playbackFinishedCount = 0; + output.playbackFinishedFuture = new Future(); + output.onPlaybackStarted = vi.fn(); + // Object.create bypasses the constructor's field initializers; mirror them. + output.options = { queueSizeMs }; + output.recentFrames = []; + output.recentFramesMs = 0; + output.replayFrames = []; + output.audioSource = { + clearQueue: vi.fn(), + queuedDuration: 0, + captureFrame: vi.fn(async (frame: CaptureFrameArg) => { + captured.push((frame as unknown as { data: Int16Array }).data[0]!); + }), + }; + return output; + }; + + it('replays the unplayed tail on resume (false interruption) — zero loss', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + // 100ms == 5 frames still queued (unplayed) when the false interruption hits. + output.audioSource.queuedDuration = 100; + output.pause(); + output.resume(); + + await output.captureFrame(frameOf(10)); + + // initial 0..9, then the unplayed tail 5..9 replayed, then 10 — nothing lost. + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10]); + }); + + it('discards the unplayed tail on clearBuffer (real interruption) — no replay', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + output.audioSource.queuedDuration = 100; + output.pause(); + output.clearBuffer(); // real interruption: the user cut the agent off + output.resume(); + + await output.captureFrame(frameOf(10)); + + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); + + it('does not replay when nothing was queued at pause', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + output.audioSource.queuedDuration = 0; + output.pause(); + output.resume(); + + await output.captureFrame(frameOf(10)); + + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); + + // A false interruption at the very end of an utterance has no following + // captureFrame() in the same segment to consume the replay tail. The segment + // completes non-interrupted (pause's clearQueue lets waitForPlayout resolve), + // so the tail must be dropped at segment end — otherwise it leaks into the + // start of the next utterance. + it('drops the unplayed tail at segment end (end-of-utterance false interruption)', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured) as ReplayOutput & { + onPlaybackFinished: (event: { playbackPosition: number; interrupted: boolean }) => void; + waitForPlayoutTask: (abortController: AbortController) => Promise; + audioSource: ReplayOutput['audioSource'] & { waitForPlayout: () => Promise }; + }; + output.onPlaybackFinished = vi.fn(); + let resolvePlayout!: () => void; + const playout = new Promise((resolve) => { + resolvePlayout = resolve; + }); + output.audioSource.waitForPlayout = () => playout; + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + // False interruption right at the end: the tail (5..9) is captured for replay. + output.audioSource.queuedDuration = 100; + output.pause(); + expect(output.replayFrames.length).toBe(5); + + // Segment completes normally: clearQueue (from pause) let waitForPlayout + // resolve and no clearBuffer fired, so interrupted === false. + const task = output.waitForPlayoutTask(new AbortController()); + resolvePlayout(); + await task; + expect(output.replayFrames.length).toBe(0); + + // Next utterance must not be prefixed with the stale tail. + output.resume(); + await output.captureFrame(frameOf(10)); + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); +}); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 11dd8eb71..410452238 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput { /** Gate held closed while the output is paused; frame forwarding awaits it. */ private playbackEnabledFuture: Future = new Future(); + // Rolling window of the most recently pushed frames (covering ~queueSizeMs), + // used to recover the unplayed tail that clearQueue() drops on pause(). On a + // false interruption (pause -> resume) these are replayed so no audio is lost; + // on a real interruption (clearBuffer) they are discarded. + private recentFrames: AudioFrame[] = []; + private recentFramesMs: number = 0; + private replayFrames: AudioFrame[] = []; + + private static frameMs(frame: AudioFrame): number { + return (frame.samplesPerChannel / frame.sampleRate) * 1000; + } + constructor(room: Room, options: AudioOutputOptions) { super(options.sampleRate, undefined, { pause: true }); this.room = room; @@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput { if (this.playbackEnabledFuture.done) { this.playbackEnabledFuture = new Future(); } + // Capture the unplayed tail (what clearQueue is about to drop) so a false + // interruption can replay it on resume instead of losing it forever. + const queuedMs = this.audioSource.queuedDuration; + if (queuedMs > 0 && this.recentFrames.length > 0) { + const tail: AudioFrame[] = []; + let acc = 0; + for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) { + const f = this.recentFrames[i]!; + tail.unshift(f); + acc += ParticipantAudioOutput.frameMs(f); + } + this.replayFrames = tail; + } // Drop already-buffered audio so playback stops promptly instead of draining the prebuffer. this.audioSource.clearQueue(); super.pause(); @@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput { } } + // Replay the unplayed tail that pause() dropped, when the pause ended in a + // resume (false interruption). These frames were already counted in + // pushedDuration on their first push, so don't recount them. + if (this.replayFrames.length > 0) { + const replay = this.replayFrames; + this.replayFrames = []; + for (const rf of replay) { + await this.audioSource.captureFrame(rf); + this.trackRecentFrame(rf); + } + } + // Count the playback segment only after the pause/interrupt gate above. super.captureFrame // bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after // that bump, the count would strand ahead of playbackFinishedCount and the next @@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput { // TODO(AJS-102): use frame.durationMs once available in rtc-node this.pushedDuration += frame.samplesPerChannel / frame.sampleRate; await this.audioSource.captureFrame(frame); + this.trackRecentFrame(frame); + } + + // Maintain a rolling window of recently pushed frames covering ~queueSizeMs, + // enough to recover the unplayed tail (<= queuedDuration) that pause() drops. + private trackRecentFrame(frame: AudioFrame): void { + // Keep a little headroom over queueSizeMs: queuedDuration can momentarily + // exceed the nominal queue size by up to one frame, and we must retain enough + // history to recover the entire unplayed tail. + const cap = (this.options.queueSizeMs ?? 1000) + 200; + this.recentFrames.push(frame); + this.recentFramesMs += ParticipantAudioOutput.frameMs(frame); + while ( + this.recentFrames.length > 1 && + this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap + ) { + const dropped = this.recentFrames.shift()!; + this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped); + } } private async waitForPlayoutTask(abortController: AbortController): Promise { @@ -483,6 +539,14 @@ export class ParticipantAudioOutput extends AudioOutput { this.pushedDuration = 0; this.firstFrameEmitted = false; + // Segment finished: drop the rolling window and any pending replay tail. A + // replay tail belongs to the segment that just ended: mid-utterance false + // interruptions already consume it on the next captureFrame() before flush, + // so anything still pending here is end-of-utterance and must be dropped — + // otherwise it leaks into the start of the next utterance. + this.recentFrames = []; + this.recentFramesMs = 0; + this.replayFrames = []; this.onPlaybackFinished({ playbackPosition: pushedDuration, @@ -522,6 +586,7 @@ export class ParticipantAudioOutput extends AudioOutput { } clearBuffer(): void { + this.replayFrames = []; // Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail. if (!this.interruptedFuture.done) { this.interruptedFuture.resolve(); diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index ed5315bf7..ca2d3962f 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -107,7 +107,9 @@ export interface RoomOutputOptions { /** Maximum queue size in milliseconds for the audio output buffer. When TTS generates audio faster than real-time, a larger queue prevents early frames from being discarded by the ring buffer. - Defaults to the AudioSource internal default (1000ms). + Defaults to 200ms (matching Python), down from the rtc-node AudioSource + internal default of 1000ms. Raise this if a bursty TTS provider drops + frames with the smaller prebuffer. */ queueSizeMs?: number; /** Send the transcription as a JSON dict for each chunk on the `lk.transcription` @@ -137,6 +139,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = { syncTranscription: true, audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), jsonFormat: false, + // Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource + // default is 1000ms; a smaller prebuffer keeps the playout queue close to + // realtime so interruptions take effect promptly. + queueSizeMs: 200, }; export class RoomIO {