Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/false-interruption-audio-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@livekit/agents': patch
---

fix(room_io): stop dropping audio on false interruptions

`ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`).

Behavioral change: the room audio output now defaults `queueSizeMs` to 200ms (matching Python), down from the rtc-node `AudioSource` default of 1000ms, to keep the playout queue close to realtime. Bursty TTS providers that previously relied on the larger prebuffer can pass an explicit `queueSizeMs` via `RoomOutputOptions`.
180 changes: 180 additions & 0 deletions agents/src/voice/room_io/_output.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
playbackFinishedCount: number;
playbackFinishedFuture: Future<void>;
onPlaybackStarted: (createdAt: number) => void;
options: { queueSizeMs?: number };
recentFrames: unknown[];
recentFramesMs: number;
replayFrames: unknown[];
audioSource: {
clearQueue: () => void;
captureFrame: (frame: CaptureFrameArg) => Promise<void>;
Expand All @@ -187,6 +191,11 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
output.playbackFinishedCount = 0;
output.playbackFinishedFuture = new Future<void>();
output.onPlaybackStarted = vi.fn();
// Object.create bypasses the constructor's field initializers; mirror them.
output.options = { queueSizeMs: 1000 };
output.recentFrames = [];
output.recentFramesMs = 0;
output.replayFrames = [];
output.audioSource = { clearQueue: vi.fn(), captureFrame: vi.fn(async () => {}) };
return output;
};
Expand Down Expand Up @@ -220,3 +229,174 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
expect(output.pushedDuration).toBeGreaterThan(0);
});
});

/**
* Regression tests for the false-interruption audio loss fix.
*
* Before the fix, pause() called clearQueue() which permanently dropped every
* frame already pushed to the native AudioSource queue (up to queueSizeMs). On a
* false interruption (pause then resume) the agent never replayed them, so up to
* ~1s of audio (rtc-node default queue) vanished from both the call and the
* recording. The output now keeps a rolling window of recently pushed frames,
* captures the unplayed tail on pause(), and replays it on the next captureFrame
* after resume() — while discarding it on a real interruption (clearBuffer()).
*/
describe('ParticipantAudioOutput false-interruption replay', () => {
const FRAME_MS = 20;
const SR = 48000;
const SPF = (SR * FRAME_MS) / 1000;

type ReplayOutput = ParticipantAudioOutput & {
startedFuture: Future<void>;
playbackEnabledFuture: Future<void>;
interruptedFuture: Future<void>;
firstFrameEmitted: boolean;
pushedDuration: number;
_capturing: boolean;
playbackSegmentsCount: number;
playbackFinishedCount: number;
playbackFinishedFuture: Future<void>;
onPlaybackStarted: (createdAt: number) => void;
options: { queueSizeMs?: number };
recentFrames: unknown[];
recentFramesMs: number;
replayFrames: unknown[];
audioSource: {
clearQueue: () => void;
captureFrame: (frame: CaptureFrameArg) => Promise<void>;
queuedDuration: number;
};
};

// Tag each frame by id in data[0] so captured ids reveal exact ordering, lost
// frames (missing id), and replayed frames (duplicate id).
const frameOf = (id: number): CaptureFrameArg => {
const data = new Int16Array(SPF);
data[0] = id;
return { samplesPerChannel: SPF, sampleRate: SR, data } as unknown as CaptureFrameArg;
};

const makeOutput = (queueSizeMs: number, captured: number[]): ReplayOutput => {
const output = Object.create(ParticipantAudioOutput.prototype) as ReplayOutput;
output.startedFuture = new Future<void>();
output.startedFuture.resolve();
output.playbackEnabledFuture = new Future<void>();
output.playbackEnabledFuture.resolve();
output.interruptedFuture = new Future<void>();
output.firstFrameEmitted = false;
output.pushedDuration = 0;
output._capturing = false;
output.playbackSegmentsCount = 0;
output.playbackFinishedCount = 0;
output.playbackFinishedFuture = new Future<void>();
output.onPlaybackStarted = vi.fn();
// Object.create bypasses the constructor's field initializers; mirror them.
output.options = { queueSizeMs };
output.recentFrames = [];
output.recentFramesMs = 0;
output.replayFrames = [];
output.audioSource = {
clearQueue: vi.fn(),
queuedDuration: 0,
captureFrame: vi.fn(async (frame: CaptureFrameArg) => {
captured.push((frame as unknown as { data: Int16Array }).data[0]!);
}),
};
return output;
};

it('replays the unplayed tail on resume (false interruption) — zero loss', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

// 100ms == 5 frames still queued (unplayed) when the false interruption hits.
output.audioSource.queuedDuration = 100;
output.pause();
output.resume();

await output.captureFrame(frameOf(10));

// initial 0..9, then the unplayed tail 5..9 replayed, then 10 — nothing lost.
expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10]);
});

it('discards the unplayed tail on clearBuffer (real interruption) — no replay', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

output.audioSource.queuedDuration = 100;
output.pause();
output.clearBuffer(); // real interruption: the user cut the agent off
output.resume();

await output.captureFrame(frameOf(10));

expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('does not replay when nothing was queued at pause', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

output.audioSource.queuedDuration = 0;
output.pause();
output.resume();

await output.captureFrame(frameOf(10));

expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

// A false interruption at the very end of an utterance has no following
// captureFrame() in the same segment to consume the replay tail. The segment
// completes non-interrupted (pause's clearQueue lets waitForPlayout resolve),
// so the tail must be dropped at segment end — otherwise it leaks into the
// start of the next utterance.
it('drops the unplayed tail at segment end (end-of-utterance false interruption)', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured) as ReplayOutput & {
onPlaybackFinished: (event: { playbackPosition: number; interrupted: boolean }) => void;
waitForPlayoutTask: (abortController: AbortController) => Promise<void>;
audioSource: ReplayOutput['audioSource'] & { waitForPlayout: () => Promise<void> };
};
output.onPlaybackFinished = vi.fn();
let resolvePlayout!: () => void;
const playout = new Promise<void>((resolve) => {
resolvePlayout = resolve;
});
output.audioSource.waitForPlayout = () => playout;

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

// False interruption right at the end: the tail (5..9) is captured for replay.
output.audioSource.queuedDuration = 100;
output.pause();
expect(output.replayFrames.length).toBe(5);

// Segment completes normally: clearQueue (from pause) let waitForPlayout
// resolve and no clearBuffer fired, so interrupted === false.
const task = output.waitForPlayoutTask(new AbortController());
resolvePlayout();
await task;
expect(output.replayFrames.length).toBe(0);

// Next utterance must not be prefixed with the stale tail.
output.resume();
await output.captureFrame(frameOf(10));
expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
});
65 changes: 65 additions & 0 deletions agents/src/voice/room_io/_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput {
/** Gate held closed while the output is paused; frame forwarding awaits it. */
private playbackEnabledFuture: Future<void> = new Future();

// Rolling window of the most recently pushed frames (covering ~queueSizeMs),
// used to recover the unplayed tail that clearQueue() drops on pause(). On a
// false interruption (pause -> resume) these are replayed so no audio is lost;
// on a real interruption (clearBuffer) they are discarded.
private recentFrames: AudioFrame[] = [];
private recentFramesMs: number = 0;
private replayFrames: AudioFrame[] = [];

private static frameMs(frame: AudioFrame): number {
return (frame.samplesPerChannel / frame.sampleRate) * 1000;
}

constructor(room: Room, options: AudioOutputOptions) {
super(options.sampleRate, undefined, { pause: true });
this.room = room;
Expand All @@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput {
if (this.playbackEnabledFuture.done) {
this.playbackEnabledFuture = new Future();
}
// Capture the unplayed tail (what clearQueue is about to drop) so a false
// interruption can replay it on resume instead of losing it forever.
const queuedMs = this.audioSource.queuedDuration;
if (queuedMs > 0 && this.recentFrames.length > 0) {
const tail: AudioFrame[] = [];
let acc = 0;
for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) {
const f = this.recentFrames[i]!;
tail.unshift(f);
acc += ParticipantAudioOutput.frameMs(f);
}
this.replayFrames = tail;
}
// Drop already-buffered audio so playback stops promptly instead of draining the prebuffer.
this.audioSource.clearQueue();
super.pause();
Expand Down Expand Up @@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput {
}
}

// Replay the unplayed tail that pause() dropped, when the pause ended in a
// resume (false interruption). These frames were already counted in
// pushedDuration on their first push, so don't recount them.
if (this.replayFrames.length > 0) {
const replay = this.replayFrames;
this.replayFrames = [];
for (const rf of replay) {
await this.audioSource.captureFrame(rf);
this.trackRecentFrame(rf);
}
}

// Count the playback segment only after the pause/interrupt gate above. super.captureFrame
// bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after
// that bump, the count would strand ahead of playbackFinishedCount and the next
Expand All @@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput {
// TODO(AJS-102): use frame.durationMs once available in rtc-node
this.pushedDuration += frame.samplesPerChannel / frame.sampleRate;
await this.audioSource.captureFrame(frame);
this.trackRecentFrame(frame);
}

// Maintain a rolling window of recently pushed frames covering ~queueSizeMs,
// enough to recover the unplayed tail (<= queuedDuration) that pause() drops.
private trackRecentFrame(frame: AudioFrame): void {
// Keep a little headroom over queueSizeMs: queuedDuration can momentarily
// exceed the nominal queue size by up to one frame, and we must retain enough
// history to recover the entire unplayed tail.
const cap = (this.options.queueSizeMs ?? 1000) + 200;
this.recentFrames.push(frame);
this.recentFramesMs += ParticipantAudioOutput.frameMs(frame);
while (
this.recentFrames.length > 1 &&
this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap
) {
const dropped = this.recentFrames.shift()!;
this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped);
}
}

private async waitForPlayoutTask(abortController: AbortController): Promise<void> {
Expand Down Expand Up @@ -483,6 +539,14 @@ export class ParticipantAudioOutput extends AudioOutput {

this.pushedDuration = 0;
this.firstFrameEmitted = false;
// Segment finished: drop the rolling window and any pending replay tail. A
// replay tail belongs to the segment that just ended: mid-utterance false
// interruptions already consume it on the next captureFrame() before flush,
// so anything still pending here is end-of-utterance and must be dropped —
// otherwise it leaks into the start of the next utterance.
this.recentFrames = [];
this.recentFramesMs = 0;
this.replayFrames = [];

this.onPlaybackFinished({
playbackPosition: pushedDuration,
Expand Down Expand Up @@ -522,6 +586,7 @@ export class ParticipantAudioOutput extends AudioOutput {
}

clearBuffer(): void {
this.replayFrames = [];
// Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail.
if (!this.interruptedFuture.done) {
this.interruptedFuture.resolve();
Expand Down
8 changes: 7 additions & 1 deletion agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ export interface RoomOutputOptions {
/** Maximum queue size in milliseconds for the audio output buffer.
When TTS generates audio faster than real-time, a larger queue prevents
early frames from being discarded by the ring buffer.
Defaults to the AudioSource internal default (1000ms).
Defaults to 200ms (matching Python), down from the rtc-node AudioSource
internal default of 1000ms. Raise this if a bursty TTS provider drops
frames with the smaller prebuffer.
*/
queueSizeMs?: number;
/** Send the transcription as a JSON dict for each chunk on the `lk.transcription`
Expand Down Expand Up @@ -137,6 +139,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = {
syncTranscription: true,
audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }),
jsonFormat: false,
// Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource
// default is 1000ms; a smaller prebuffer keeps the playout queue close to
// realtime so interruptions take effect promptly.
queueSizeMs: 200,
};
Comment thread
toubatbrian marked this conversation as resolved.

export class RoomIO {
Expand Down
Loading