Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agents/src/llm/fallback_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class FallbackLLMStream extends LLMStream {
}

// Handle timeout errors
if (error instanceof Error && error.name === 'AbortError') {
if ((error as { name?: string })?.name === 'AbortError') {
if (checkRecovery) {
this._log.warn({ llm: llm.label() }, 'recovery timed out');
} else {
Expand Down
2 changes: 1 addition & 1 deletion agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1608,7 +1608,7 @@ export class AgentActivity implements RecognitionHooks {
const onAbort = () => waitInactiveTask.cancel();
signal.addEventListener('abort', onAbort, { once: true });
const waitInactiveResult = waitInactiveTask.result.catch((error) => {
if (error instanceof Error && error.name === 'AbortError') {
if ((error as { name?: string })?.name === 'AbortError') {
return;
}
throw error;
Expand Down
37 changes: 28 additions & 9 deletions agents/src/voice/audio_recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ export class AudioRecognition {
// provider's logs for debugging.
private sttRequestIds: string[] = [];

private vadInputStream: ReadableStream<AudioFrame>;
private sttInputStream: ReadableStream<AudioFrame>;
private vadInputStream: ReadableStream<AudioFrame> | null = null;
private sttInputStream: ReadableStream<AudioFrame> | null = null;
/**
* Active subscriber writers fed from {@link subscribersBroadcast}. Each
* {@link subscribeAudioStream} call appends one entry; entries are dropped
Expand Down Expand Up @@ -498,13 +498,31 @@ export class AudioRecognition {
);
this.interruptionStreamChannel = createStreamChannel();
this.interruptionStreamChannel.addStreamInput(inputStream);
} else {
} else if (opts.vad) {
const [vadInputStream, sttInputStream] = primaryInputStream.tee();
this.vadInputStream = vadInputStream;
this.sttInputStream = mergeReadableStreams(
replaceSttInputWithSilence(sttInputStream),
this.silenceAudioTransform.readable,
);
Comment on lines +501 to 507

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 VAD-only (no STT) case still creates unconsumed sttInputStream

The else if (opts.vad) branch at line 501 creates a two-way tee and sets both vadInputStream and sttInputStream. If opts.stt is falsy, sttInputStream is never consumed by forwardInputAudioToStt (which now has a null guard), but the merged stream created at line 504-507 — including the tee branch — has no reader. With web streams, an unconsumed tee branch buffers indefinitely. This is the same behavior as the old code (which always tee'd regardless of STT presence), so it's a pre-existing concern rather than a regression. In practice, VAD-without-STT configurations are likely very rare.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

} else if (opts.stt) {
this.sttInputStream = mergeReadableStreams(
replaceSttInputWithSilence(primaryInputStream),
this.silenceAudioTransform.readable,
);
} else {
// No VAD or STT consumer — drain the primary stream so the broadcast
// transform keeps flowing without queuing frames indefinitely.
const reader = primaryInputStream.getReader();
void (async () => {
try {
while (!(await reader.read()).done);
} catch {
/* stream closed */
} finally {
reader.releaseLock();
}
})();
}
this.silenceAudioWriter = this.silenceAudioTransform.writable.getWriter();
}
Expand Down Expand Up @@ -1729,8 +1747,8 @@ export class AudioRecognition {
this.logger.debug('EOU detection task completed');
})
.catch((err: unknown) => {
if (err instanceof Error && err.name === 'AbortError') {
// ignore aborted errors
if ((err as { name?: string })?.name === 'AbortError') {
// ignore aborted errors (DOMException from AbortSignal is not instanceof Error)
return;
}
this.logger.error(err, 'Error in EOU detection task:');
Expand All @@ -1745,7 +1763,7 @@ export class AudioRecognition {
try {
await this.bounceEOUTask.result;
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
if ((error as { name?: string })?.name === 'AbortError') {
return;
}
throw error;
Expand Down Expand Up @@ -1830,6 +1848,7 @@ export class AudioRecognition {
}

private async forwardInputAudioToStt(pipeline: STTPipeline, signal: AbortSignal) {
if (!this.sttInputStream) return;
for await (const frame of readStream(this.sttInputStream, signal)) {
await pipeline.audioChannel.write(frame);
}
Expand All @@ -1842,11 +1861,11 @@ export class AudioRecognition {
}

private async createVadTask(vad: VAD | undefined, signal: AbortSignal) {
if (!vad) return;
if (!vad || !this.vadInputStream) return;

const vadStream = vad.stream();
this.vadStream = vadStream;
vadStream.updateInputStream(this.vadInputStream);
vadStream.updateInputStream(this.vadInputStream!);

const abortHandler = () => {
vadStream.detachInputStream();
Expand Down Expand Up @@ -2264,7 +2283,7 @@ export class AudioRecognition {
this.logger.debug('User turn committed');
})
.catch((err: unknown) => {
if (err instanceof Error && err.name === 'AbortError') {
if ((err as { name?: string })?.name === 'AbortError') {
this.logger.debug('User turn commit task cancelled');
return;
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/elevenlabs/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ export class STT extends stt.STT {
if (error instanceof APIStatusError) {
throw new APIConnectionError({ message: error.message });
}
if (error instanceof Error && error.name === 'AbortError') {
if ((error as { name?: string })?.name === 'AbortError') {
throw new APITimeoutError({});
}
throw new APIConnectionError({});
Expand Down
2 changes: 1 addition & 1 deletion plugins/google/src/beta/gemini_tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ export class ChunkedStream extends tts.ChunkedStream {

sendLastFrame(true);
} catch (error: unknown) {
if (error instanceof Error && error.name === 'AbortError') {
if ((error as { name?: string })?.name === 'AbortError') {
return;
}
if (isAPIError(error)) throw error;
Expand Down
7 changes: 2 additions & 5 deletions plugins/openai/src/realtime/realtime_model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1152,13 +1152,10 @@ export class RealtimeSession extends llm.RealtimeSession {

private async runWs(wsConn: WebSocket): Promise<void> {
const forwardEvents = async (signal: AbortSignal): Promise<void> => {
const abortFuture = new Future<void>();
signal.addEventListener('abort', () => abortFuture.resolve());

while (!this.#closed && wsConn.readyState === WebSocket.OPEN && !signal.aborted) {
try {
const event = await Promise.race([this.messageChannel.get(), abortFuture.await]);
if (signal.aborted || abortFuture.done || event === undefined) {
const event = await this.messageChannel.get({ signal });
if (signal.aborted || event === undefined) {
break;
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/openai/src/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export class ChunkedStream extends tts.ChunkedStream {

this.queue.close();
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
if ((error as { name?: string })?.name === 'AbortError') {
return;
}
throw error;
Expand Down