diff --git a/agents/src/llm/fallback_adapter.ts b/agents/src/llm/fallback_adapter.ts index 128c2392c..2c4c04da3 100644 --- a/agents/src/llm/fallback_adapter.ts +++ b/agents/src/llm/fallback_adapter.ts @@ -241,7 +241,7 @@ class FallbackLLMStream extends LLMStream { } // Handle timeout errors - if (error instanceof Error && error.name === 'AbortError') { + if ((error as { name?: string })?.name === 'AbortError') { if (checkRecovery) { this._log.warn({ llm: llm.label() }, 'recovery timed out'); } else { diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 21a6a9fac..3279af680 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1608,7 +1608,7 @@ export class AgentActivity implements RecognitionHooks { const onAbort = () => waitInactiveTask.cancel(); signal.addEventListener('abort', onAbort, { once: true }); const waitInactiveResult = waitInactiveTask.result.catch((error) => { - if (error instanceof Error && error.name === 'AbortError') { + if ((error as { name?: string })?.name === 'AbortError') { return; } throw error; diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index dfa72de1d..14a5dbea3 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -336,8 +336,8 @@ export class AudioRecognition { // provider's logs for debugging. private sttRequestIds: string[] = []; - private vadInputStream: ReadableStream; - private sttInputStream: ReadableStream; + private vadInputStream: ReadableStream | null = null; + private sttInputStream: ReadableStream | null = null; /** * Active subscriber writers fed from {@link subscribersBroadcast}. Each * {@link subscribeAudioStream} call appends one entry; entries are dropped @@ -498,13 +498,31 @@ export class AudioRecognition { ); this.interruptionStreamChannel = createStreamChannel(); this.interruptionStreamChannel.addStreamInput(inputStream); - } else { + } else if (opts.vad) { const [vadInputStream, sttInputStream] = primaryInputStream.tee(); this.vadInputStream = vadInputStream; this.sttInputStream = mergeReadableStreams( replaceSttInputWithSilence(sttInputStream), this.silenceAudioTransform.readable, ); + } else if (opts.stt) { + this.sttInputStream = mergeReadableStreams( + replaceSttInputWithSilence(primaryInputStream), + this.silenceAudioTransform.readable, + ); + } else { + // No VAD or STT consumer — drain the primary stream so the broadcast + // transform keeps flowing without queuing frames indefinitely. + const reader = primaryInputStream.getReader(); + void (async () => { + try { + while (!(await reader.read()).done); + } catch { + /* stream closed */ + } finally { + reader.releaseLock(); + } + })(); } this.silenceAudioWriter = this.silenceAudioTransform.writable.getWriter(); } @@ -1729,8 +1747,8 @@ export class AudioRecognition { this.logger.debug('EOU detection task completed'); }) .catch((err: unknown) => { - if (err instanceof Error && err.name === 'AbortError') { - // ignore aborted errors + if ((err as { name?: string })?.name === 'AbortError') { + // ignore aborted errors (DOMException from AbortSignal is not instanceof Error) return; } this.logger.error(err, 'Error in EOU detection task:'); @@ -1745,7 +1763,7 @@ export class AudioRecognition { try { await this.bounceEOUTask.result; } catch (error) { - if (error instanceof Error && error.name === 'AbortError') { + if ((error as { name?: string })?.name === 'AbortError') { return; } throw error; @@ -1830,6 +1848,7 @@ export class AudioRecognition { } private async forwardInputAudioToStt(pipeline: STTPipeline, signal: AbortSignal) { + if (!this.sttInputStream) return; for await (const frame of readStream(this.sttInputStream, signal)) { await pipeline.audioChannel.write(frame); } @@ -1842,11 +1861,11 @@ export class AudioRecognition { } private async createVadTask(vad: VAD | undefined, signal: AbortSignal) { - if (!vad) return; + if (!vad || !this.vadInputStream) return; const vadStream = vad.stream(); this.vadStream = vadStream; - vadStream.updateInputStream(this.vadInputStream); + vadStream.updateInputStream(this.vadInputStream!); const abortHandler = () => { vadStream.detachInputStream(); @@ -2264,7 +2283,7 @@ export class AudioRecognition { this.logger.debug('User turn committed'); }) .catch((err: unknown) => { - if (err instanceof Error && err.name === 'AbortError') { + if ((err as { name?: string })?.name === 'AbortError') { this.logger.debug('User turn commit task cancelled'); return; } diff --git a/plugins/elevenlabs/src/stt.ts b/plugins/elevenlabs/src/stt.ts index 40c7f88fe..5eecbb6cf 100644 --- a/plugins/elevenlabs/src/stt.ts +++ b/plugins/elevenlabs/src/stt.ts @@ -393,7 +393,7 @@ export class STT extends stt.STT { if (error instanceof APIStatusError) { throw new APIConnectionError({ message: error.message }); } - if (error instanceof Error && error.name === 'AbortError') { + if ((error as { name?: string })?.name === 'AbortError') { throw new APITimeoutError({}); } throw new APIConnectionError({}); diff --git a/plugins/google/src/beta/gemini_tts.ts b/plugins/google/src/beta/gemini_tts.ts index 0d778f0c0..edd331552 100644 --- a/plugins/google/src/beta/gemini_tts.ts +++ b/plugins/google/src/beta/gemini_tts.ts @@ -267,7 +267,7 @@ export class ChunkedStream extends tts.ChunkedStream { sendLastFrame(true); } catch (error: unknown) { - if (error instanceof Error && error.name === 'AbortError') { + if ((error as { name?: string })?.name === 'AbortError') { return; } if (isAPIError(error)) throw error; diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index e8f88035d..488aaf72f 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -1152,13 +1152,10 @@ export class RealtimeSession extends llm.RealtimeSession { private async runWs(wsConn: WebSocket): Promise { const forwardEvents = async (signal: AbortSignal): Promise => { - const abortFuture = new Future(); - signal.addEventListener('abort', () => abortFuture.resolve()); - while (!this.#closed && wsConn.readyState === WebSocket.OPEN && !signal.aborted) { try { - const event = await Promise.race([this.messageChannel.get(), abortFuture.await]); - if (signal.aborted || abortFuture.done || event === undefined) { + const event = await this.messageChannel.get({ signal }); + if (signal.aborted || event === undefined) { break; } diff --git a/plugins/openai/src/tts.ts b/plugins/openai/src/tts.ts index f0261e89d..d72a23c66 100644 --- a/plugins/openai/src/tts.ts +++ b/plugins/openai/src/tts.ts @@ -147,7 +147,7 @@ export class ChunkedStream extends tts.ChunkedStream { this.queue.close(); } catch (error) { - if (error instanceof Error && error.name === 'AbortError') { + if ((error as { name?: string })?.name === 'AbortError') { return; } throw error;