Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/false-interruption-audio-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@livekit/agents': patch
---

fix(room_io): stop dropping audio on false interruptions

`ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so up to ~1s of agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`).
139 changes: 139 additions & 0 deletions agents/src/voice/room_io/_output.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
playbackFinishedCount: number;
playbackFinishedFuture: Future<void>;
onPlaybackStarted: (createdAt: number) => void;
options: { queueSizeMs?: number };
recentFrames: unknown[];
recentFramesMs: number;
replayFrames: unknown[];
audioSource: {
clearQueue: () => void;
captureFrame: (frame: CaptureFrameArg) => Promise<void>;
Expand All @@ -187,6 +191,11 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
output.playbackFinishedCount = 0;
output.playbackFinishedFuture = new Future<void>();
output.onPlaybackStarted = vi.fn();
// Object.create bypasses the constructor's field initializers; mirror them.
output.options = { queueSizeMs: 1000 };
output.recentFrames = [];
output.recentFramesMs = 0;
output.replayFrames = [];
output.audioSource = { clearQueue: vi.fn(), captureFrame: vi.fn(async () => {}) };
return output;
};
Expand Down Expand Up @@ -220,3 +229,133 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
expect(output.pushedDuration).toBeGreaterThan(0);
});
});

/**
* Regression tests for the false-interruption audio loss fix.
*
* Before the fix, pause() called clearQueue() which permanently dropped every
* frame already pushed to the native AudioSource queue (up to queueSizeMs). On a
* false interruption (pause then resume) the agent never replayed them, so up to
* ~1s of audio (rtc-node default queue) vanished from both the call and the
* recording. The output now keeps a rolling window of recently pushed frames,
* captures the unplayed tail on pause(), and replays it on the next captureFrame
* after resume() — while discarding it on a real interruption (clearBuffer()).
*/
describe('ParticipantAudioOutput false-interruption replay', () => {
const FRAME_MS = 20;
const SR = 48000;
const SPF = (SR * FRAME_MS) / 1000;

type ReplayOutput = ParticipantAudioOutput & {
startedFuture: Future<void>;
playbackEnabledFuture: Future<void>;
interruptedFuture: Future<void>;
firstFrameEmitted: boolean;
pushedDuration: number;
_capturing: boolean;
playbackSegmentsCount: number;
playbackFinishedCount: number;
playbackFinishedFuture: Future<void>;
onPlaybackStarted: (createdAt: number) => void;
options: { queueSizeMs?: number };
recentFrames: unknown[];
recentFramesMs: number;
replayFrames: unknown[];
audioSource: {
clearQueue: () => void;
captureFrame: (frame: CaptureFrameArg) => Promise<void>;
queuedDuration: number;
};
};

// Tag each frame by id in data[0] so captured ids reveal exact ordering, lost
// frames (missing id), and replayed frames (duplicate id).
const frameOf = (id: number): CaptureFrameArg => {
const data = new Int16Array(SPF);
data[0] = id;
return { samplesPerChannel: SPF, sampleRate: SR, data } as unknown as CaptureFrameArg;
};

const makeOutput = (queueSizeMs: number, captured: number[]): ReplayOutput => {
const output = Object.create(ParticipantAudioOutput.prototype) as ReplayOutput;
output.startedFuture = new Future<void>();
output.startedFuture.resolve();
output.playbackEnabledFuture = new Future<void>();
output.playbackEnabledFuture.resolve();
output.interruptedFuture = new Future<void>();
output.firstFrameEmitted = false;
output.pushedDuration = 0;
output._capturing = false;
output.playbackSegmentsCount = 0;
output.playbackFinishedCount = 0;
output.playbackFinishedFuture = new Future<void>();
output.onPlaybackStarted = vi.fn();
// Object.create bypasses the constructor's field initializers; mirror them.
output.options = { queueSizeMs };
output.recentFrames = [];
output.recentFramesMs = 0;
output.replayFrames = [];
output.audioSource = {
clearQueue: vi.fn(),
queuedDuration: 0,
captureFrame: vi.fn(async (frame: CaptureFrameArg) => {
captured.push((frame as unknown as { data: Int16Array }).data[0]!);
}),
};
return output;
};

it('replays the unplayed tail on resume (false interruption) — zero loss', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

// 100ms == 5 frames still queued (unplayed) when the false interruption hits.
output.audioSource.queuedDuration = 100;
output.pause();
output.resume();

await output.captureFrame(frameOf(10));

// initial 0..9, then the unplayed tail 5..9 replayed, then 10 — nothing lost.
expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10]);
});

it('discards the unplayed tail on clearBuffer (real interruption) — no replay', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

output.audioSource.queuedDuration = 100;
output.pause();
output.clearBuffer(); // real interruption: the user cut the agent off
output.resume();

await output.captureFrame(frameOf(10));

expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('does not replay when nothing was queued at pause', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

output.audioSource.queuedDuration = 0;
output.pause();
output.resume();

await output.captureFrame(frameOf(10));

expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
});
64 changes: 64 additions & 0 deletions agents/src/voice/room_io/_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput {
/** Gate held closed while the output is paused; frame forwarding awaits it. */
private playbackEnabledFuture: Future<void> = new Future();

// Rolling window of the most recently pushed frames (covering ~queueSizeMs),
// used to recover the unplayed tail that clearQueue() drops on pause(). On a
// false interruption (pause -> resume) these are replayed so no audio is lost;
// on a real interruption (clearBuffer) they are discarded.
private recentFrames: AudioFrame[] = [];
private recentFramesMs: number = 0;
private replayFrames: AudioFrame[] = [];

private static frameMs(frame: AudioFrame): number {
return (frame.samplesPerChannel / frame.sampleRate) * 1000;
}

constructor(room: Room, options: AudioOutputOptions) {
super(options.sampleRate, undefined, { pause: true });
this.room = room;
Expand All @@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput {
if (this.playbackEnabledFuture.done) {
this.playbackEnabledFuture = new Future();
}
// Capture the unplayed tail (what clearQueue is about to drop) so a false
// interruption can replay it on resume instead of losing it forever.
const queuedMs = this.audioSource.queuedDuration;
if (queuedMs > 0 && this.recentFrames.length > 0) {
const tail: AudioFrame[] = [];
let acc = 0;
for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) {
const f = this.recentFrames[i]!;
tail.unshift(f);
acc += ParticipantAudioOutput.frameMs(f);
}
this.replayFrames = tail;
}
// Drop already-buffered audio so playback stops promptly instead of draining the prebuffer.
this.audioSource.clearQueue();
super.pause();
Expand Down Expand Up @@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput {
}
}

// Replay the unplayed tail that pause() dropped, when the pause ended in a
// resume (false interruption). These frames were already counted in
// pushedDuration on their first push, so don't recount them.
if (this.replayFrames.length > 0) {
const replay = this.replayFrames;
this.replayFrames = [];
for (const rf of replay) {
await this.audioSource.captureFrame(rf);
this.trackRecentFrame(rf);
}
}

// Count the playback segment only after the pause/interrupt gate above. super.captureFrame
// bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after
// that bump, the count would strand ahead of playbackFinishedCount and the next
Expand All @@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput {
// TODO(AJS-102): use frame.durationMs once available in rtc-node
this.pushedDuration += frame.samplesPerChannel / frame.sampleRate;
await this.audioSource.captureFrame(frame);
this.trackRecentFrame(frame);
}

// Maintain a rolling window of recently pushed frames covering ~queueSizeMs,
// enough to recover the unplayed tail (<= queuedDuration) that pause() drops.
private trackRecentFrame(frame: AudioFrame): void {
// Keep a little headroom over queueSizeMs: queuedDuration can momentarily
// exceed the nominal queue size by up to one frame, and we must retain enough
// history to recover the entire unplayed tail.
const cap = (this.options.queueSizeMs ?? 1000) + 200;
this.recentFrames.push(frame);
this.recentFramesMs += ParticipantAudioOutput.frameMs(frame);
while (
this.recentFrames.length > 1 &&
this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap
) {
const dropped = this.recentFrames.shift()!;
this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped);
}
}

private async waitForPlayoutTask(abortController: AbortController): Promise<void> {
Expand Down Expand Up @@ -483,6 +539,13 @@ export class ParticipantAudioOutput extends AudioOutput {

this.pushedDuration = 0;
this.firstFrameEmitted = false;
// Segment finished: drop the rolling window. On an interruption also drop any
// pending replay tail (the user chose to cut the agent off).
this.recentFrames = [];
this.recentFramesMs = 0;
if (interrupted) {
this.replayFrames = [];
Comment thread
toubatbrian marked this conversation as resolved.
Outdated
}
Comment thread
toubatbrian marked this conversation as resolved.
Outdated

this.onPlaybackFinished({
playbackPosition: pushedDuration,
Expand Down Expand Up @@ -522,6 +585,7 @@ export class ParticipantAudioOutput extends AudioOutput {
}

clearBuffer(): void {
this.replayFrames = [];
// Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail.
if (!this.interruptedFuture.done) {
this.interruptedFuture.resolve();
Expand Down
4 changes: 4 additions & 0 deletions agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = {
syncTranscription: true,
audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }),
jsonFormat: false,
// Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource
// default is 1000ms; a smaller prebuffer keeps the playout queue close to
// realtime so interruptions take effect promptly.
queueSizeMs: 200,
};
Comment thread
toubatbrian marked this conversation as resolved.

export class RoomIO {
Expand Down
Loading