fix: drain unused audio tee branches to prevent unbounded memory growth#1878
fix: drain unused audio tee branches to prevent unbounded memory growth#1878tsushanth wants to merge 2 commits into
Conversation
DOMException (thrown by AbortSignal/AbortController) is not instanceof
Error in Node and Bun, so guards of the form:
if (err instanceof Error && err.name === 'AbortError')
never match. The instanceof check is redundant for the name property
since any thrown value can carry it. Replace all six occurrences across
agents and plugins with:
if ((err as { name?: string })?.name === 'AbortError')
This silences the spurious error-level log noise on normal turn and
session teardown that triggered livekit#1712.
When no VAD or STT consumer is configured (e.g. realtime LLM turn detection only), audio_recognition.ts was unconditionally calling .tee() on the primary input stream and assigning both branches to vadInputStream and sttInputStream. Neither branch was ever consumed, causing the broadcast transform to buffer indefinitely — roughly 300 KB/s RSS growth under sustained input. Make .tee() conditional: tee into both branches only when both VAD and STT consumers exist, tee into one branch when only VAD exists, skip the tee entirely when only STT exists (pass primaryInputStream directly), and drain the stream with a background reader when neither consumer is present so the broadcast transform keeps flowing. Also remove the abortFuture / Promise.race pattern in RealtimeSession.forwardEvents: Queue.get already accepts an AbortSignal, so the auxiliary Future and event-listener were redundant and prevented the signal listener from being GC'd until the queue resolved.
|
| } else if (opts.vad) { | ||
| const [vadInputStream, sttInputStream] = primaryInputStream.tee(); | ||
| this.vadInputStream = vadInputStream; | ||
| this.sttInputStream = mergeReadableStreams( | ||
| replaceSttInputWithSilence(sttInputStream), | ||
| this.silenceAudioTransform.readable, | ||
| ); |
There was a problem hiding this comment.
🚩 VAD-only (no STT) case still creates unconsumed sttInputStream
The else if (opts.vad) branch at line 501 creates a two-way tee and sets both vadInputStream and sttInputStream. If opts.stt is falsy, sttInputStream is never consumed by forwardInputAudioToStt (which now has a null guard), but the merged stream created at line 504-507 — including the tee branch — has no reader. With web streams, an unconsumed tee branch buffers indefinitely. This is the same behavior as the old code (which always tee'd regardless of STT presence), so it's a pre-existing concern rather than a regression. In practice, VAD-without-STT configurations are likely very rare.
Was this helpful? React with 👍 or 👎 to provide feedback.
When no VAD or STT consumer is configured — the common case with realtime LLM turn detection —
audio_recognition.tsunconditionally calls.tee()on the primary input stream and assigns both branches tovadInputStreamandsttInputStream. Neither branch is ever read, so the broadcastTransformStreamqueues every incoming audio frame with no backpressure release. In a sustained session this produces roughly 300 KB/s of RSS growth until the process OOMs or the connection drops.The fix makes
.tee()conditional on which consumers are actually present. When both VAD and STT are configured the stream is tee'd into both branches as before. When only VAD is present the stream is tee'd once. When only STT is present the primary stream is passed directly without any tee. When neither consumer exists a lightweight background reader drains the primary stream so the upstream broadcast transform stays unblocked without accumulating frames.The corresponding null guards in
forwardInputAudioToSttandcreateVadTasklet TypeScript enforce the invariant and provide an explicit early-return path for callers that race before the stream is established.A second, smaller leak lives in
RealtimeSession.forwardEventsin the OpenAI realtime plugin. Each call allocated aFuture, registered an'abort'listener on the signal, and then raced the channel get against that future. BecauseQueue.getalready accepts anAbortSignaldirectly, theFutureand its listener were redundant and kept the listener registered on the signal until the channel's pending promise resolved — preventing GC of the closure in long-lived sessions. The fix passes the signal intoQueue.getdirectly and removes the auxiliary future entirely.Fixes #1462