Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export { AudioClient, AudioClientSettings } from './openai/audioClient.js';
export { EmbeddingClient } from './openai/embeddingClient.js';
export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js';
export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js';
export { CoreError } from './openai/liveAudioTranscriptionTypes.js';
export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js';
export { ModelLoadManager } from './detail/modelLoadManager.js';
/** @internal */
Expand Down
140 changes: 115 additions & 25 deletions sdk/js/src/openai/liveAudioTranscriptionClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CoreInterop } from '../detail/coreInterop.js';
import { LiveAudioTranscriptionResponse, parseTranscriptionResult, tryParseCoreError } from './liveAudioTranscriptionTypes.js';
import { LiveAudioTranscriptionResponse, parseTranscriptionResult, wrapCoreError } from './liveAudioTranscriptionTypes.js';

/**
* Audio format settings for a streaming session.
Expand Down Expand Up @@ -30,6 +30,27 @@ export class LiveAudioTranscriptionOptions {
}
}

/**
* DOMException-compatible AbortError. Matches the shape thrown by native fetch/AbortController
* so callers can use `err.name === 'AbortError'` for cancellation detection.
* @internal
*/
function makeAbortError(message = 'The operation was aborted.'): Error {
const err = new Error(message);
err.name = 'AbortError';
return err;
}

/**
* If `signal` is already aborted, throw an AbortError immediately.
* @internal
*/
function throwIfAborted(signal: AbortSignal | undefined): void {
if (signal?.aborted) {
throw makeAbortError(signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.');
Comment thread
rui-ren marked this conversation as resolved.
Outdated
}
}

/**
* Internal async queue that acts like C#'s Channel<T>.
* Supports a single consumer reading via async iteration and multiple producers writing.
Expand Down Expand Up @@ -193,11 +214,14 @@ export class LiveAudioTranscriptionSession {
* Start a real-time audio streaming session.
* Must be called before append() or getTranscriptionStream().
* Settings are frozen after this call.
*
* @param signal - Optional AbortSignal. If aborted before or during start, an AbortError is thrown.
*/
public async start(): Promise<void> {
public async start(signal?: AbortSignal): Promise<void> {
if (this.started) {
throw new Error('Streaming session already started. Call stop() first.');
}
throwIfAborted(signal);

this.activeSettings = this.settings.snapshot();
this.outputQueue = new AsyncQueue<LiveAudioTranscriptionResponse>();
Expand Down Expand Up @@ -225,10 +249,7 @@ export class LiveAudioTranscriptionSession {
throw new Error('Native core did not return a session handle.');
}
} catch (error) {
const err = new Error(
`Error starting audio stream session: ${error instanceof Error ? error.message : String(error)}`,
{ cause: error }
);
const err = wrapCoreError('Error starting audio stream session: ', error);
this.outputQueue.complete(err);
throw err;
}
Expand All @@ -237,25 +258,64 @@ export class LiveAudioTranscriptionSession {
this.stopped = false;

this.sessionAbortController = new AbortController();
if (signal) {
const onAbort = () => this.handleExternalAbort(signal);
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener('abort', onAbort, { once: true });
Comment thread
rui-ren marked this conversation as resolved.
Outdated
}
}
this.pushLoopPromise = this.pushLoop();
}

/**
* Handle an external AbortSignal firing while the session is active.
* Tears down the session by completing internal queues with an AbortError.
* @internal
*/
private handleExternalAbort(signal: AbortSignal): void {
if (this.stopped || !this.started) return;
const err = makeAbortError(signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.');
this.stopped = true;
this.started = false;
this.sessionAbortController?.abort();
Comment thread
rui-ren marked this conversation as resolved.
Outdated
this.pushQueue?.complete(err);
this.outputQueue?.complete(err);
}

/**
* Push a chunk of raw PCM audio data to the streaming session.
* Can be called from any context. Chunks are internally queued
* and serialized to native core one at a time.
*
* @param pcmData - Raw PCM audio bytes matching the configured format.
* @param signal - Optional AbortSignal. If aborted while waiting for queue capacity, an AbortError is thrown.
*/
public async append(pcmData: Uint8Array): Promise<void> {
public async append(pcmData: Uint8Array, signal?: AbortSignal): Promise<void> {
if (!this.started || this.stopped) {
throw new Error('No active streaming session. Call start() first.');
}
throwIfAborted(signal);

const copy = new Uint8Array(pcmData.length);
copy.set(pcmData);

await this.pushQueue!.write(copy);
if (!signal) {
await this.pushQueue!.write(copy);
return;
}

// Race the queue write against the abort signal.
const writePromise = this.pushQueue!.write(copy);
const abortPromise = new Promise<never>((_, reject) => {
const onAbort = () => reject(makeAbortError(
signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'
));
Comment thread
rui-ren marked this conversation as resolved.
Outdated
if (signal.aborted) onAbort();
else signal.addEventListener('abort', onAbort, { once: true });
});
await Promise.race([writePromise, abortPromise]);
}

/**
Expand Down Expand Up @@ -291,12 +351,9 @@ export class LiveAudioTranscriptionSession {
}
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
const errorInfo = tryParseCoreError(errorMsg);

const fatalError = new Error(
`Push failed (code=${errorInfo?.code ?? 'UNKNOWN'}): ${errorMsg}`,
{ cause: error }
);
const fatalError = wrapCoreError(`Push failed: `, error);
// Preserve the previous "Push failed (code=...)" prefix in the message for log compatibility.
(fatalError as { message: string }).message = `Push failed (code=${fatalError.code}): ${errorMsg}`;
this.stopped = true;
this.started = false;
this.pushQueue?.complete(fatalError);
Expand All @@ -317,33 +374,56 @@ export class LiveAudioTranscriptionSession {
* Get the async iterable of transcription results.
* Results arrive as the native ASR engine processes audio data.
*
* @param signal - Optional AbortSignal. If aborted, iteration ends with an AbortError.
*
* Usage:
* ```ts
* for await (const result of client.getTranscriptionStream()) {
* console.log(result.content[0].text);
* }
* ```
*/
public async *getTranscriptionStream(): AsyncGenerator<LiveAudioTranscriptionResponse> {
public async *getTranscriptionStream(signal?: AbortSignal): AsyncGenerator<LiveAudioTranscriptionResponse> {
if (!this.outputQueue) {
throw new Error('No active streaming session. Call start() first.');
}
if (this.streamConsumed) {
throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.');
}
this.streamConsumed = true;
throwIfAborted(signal);

// If a signal is provided, complete the output queue with an AbortError on abort
// so the pending iterator yield rejects promptly.
const queue = this.outputQueue;
let onAbort: (() => void) | null = null;
if (signal) {
onAbort = () => queue.complete(makeAbortError(
signal.reason instanceof Error ? signal.reason.message : 'The operation was aborted.'
));
signal.addEventListener('abort', onAbort, { once: true });
}

for await (const item of this.outputQueue) {
yield item;
try {
for await (const item of queue) {
yield item;
}
} finally {
if (signal && onAbort) {
signal.removeEventListener('abort', onAbort);
}
}
}

/**
* Signal end-of-audio and stop the streaming session.
* Any remaining buffered audio in the push queue will be drained to native core first.
* Final results are delivered through getTranscriptionStream() before it completes.
*
* @param signal - Optional AbortSignal. If aborted while draining the push queue, drain is
* short-circuited and the native session is stopped immediately.
*/
public async stop(): Promise<void> {
public async stop(signal?: AbortSignal): Promise<void> {
if (!this.started || this.stopped) {
return;
}
Expand All @@ -353,12 +433,25 @@ export class LiveAudioTranscriptionSession {
this.pushQueue?.complete();

if (this.pushLoopPromise) {
await this.pushLoopPromise;
if (signal) {
// Allow the caller to short-circuit the drain via abort.
const abortPromise = new Promise<void>((resolve) => {
const onAbort = () => {
this.sessionAbortController?.abort();
resolve();
};
if (signal.aborted) onAbort();
else signal.addEventListener('abort', onAbort, { once: true });
});
await Promise.race([this.pushLoopPromise, abortPromise]);
Comment thread
rui-ren marked this conversation as resolved.
Outdated
} else {
await this.pushLoopPromise;
}
}

this.sessionAbortController?.abort();

let stopError: Error | null = null;
let stopError: unknown = null;
try {
const responseData = this.coreInterop.executeCommand("audio_stream_stop", {
Params: { SessionHandle: this.sessionHandle! }
Expand All @@ -376,7 +469,7 @@ export class LiveAudioTranscriptionSession {
}
}
} catch (error) {
stopError = error instanceof Error ? error : new Error(String(error));
stopError = error;
}

this.sessionHandle = null;
Expand All @@ -386,10 +479,7 @@ export class LiveAudioTranscriptionSession {
this.outputQueue?.complete();

if (stopError) {
throw new Error(
`Error stopping audio stream session: ${stopError.message}`,
{ cause: stopError }
);
throw wrapCoreError('Error stopping audio stream session: ', stopError);
}
}

Expand Down
35 changes: 35 additions & 0 deletions sdk/js/src/openai/liveAudioTranscriptionTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,38 @@ export function tryParseCoreError(errorString: string): CoreErrorResponse | null
}
return null;
}

/**
* Error thrown by live audio streaming operations when the native core reports a failure.
* Surfaces structured fields (code, isTransient) so callers can implement targeted retry
* or telemetry logic instead of string-matching on `message`.
*
* `code` is `'UNKNOWN'` when the underlying error is not a structured CoreErrorResponse.
*/
export class CoreError extends Error {
Comment thread
rui-ren marked this conversation as resolved.
Outdated
/** Machine-readable error code from the native core, or `'UNKNOWN'`. */
public readonly code: string;
/** Whether the underlying core error is transient (caller may retry). */
public readonly isTransient: boolean;

constructor(message: string, code: string, isTransient: boolean, options?: { cause?: unknown }) {
super(message, options as ErrorOptions);
this.name = 'CoreError';
this.code = code;
this.isTransient = isTransient;
}
}

/**
* Wrap an arbitrary thrown value into a CoreError, parsing the underlying CoreErrorResponse
* if present. The resulting `message` keeps the existing prefix format for backwards
* compatibility with logs and troubleshooting docs.
* @internal
*/
export function wrapCoreError(prefix: string, cause: unknown): CoreError {
const causeMsg = cause instanceof Error ? cause.message : String(cause);
const info = tryParseCoreError(causeMsg);
const code = info?.code ?? 'UNKNOWN';
const isTransient = info?.isTransient ?? false;
return new CoreError(`${prefix}${causeMsg}`, code, isTransient, { cause });
}
31 changes: 30 additions & 1 deletion sdk/js/test/openai/liveAudioTranscription.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it } from 'mocha';
import { expect } from 'chai';
import { parseTranscriptionResult, tryParseCoreError } from '../../src/openai/liveAudioTranscriptionTypes.js';
import { parseTranscriptionResult, tryParseCoreError, CoreError, wrapCoreError } from '../../src/openai/liveAudioTranscriptionTypes.js';
import { LiveAudioTranscriptionOptions } from '../../src/openai/liveAudioTranscriptionClient.js';
import { getTestManager } from '../testUtils.js';

Expand Down Expand Up @@ -116,6 +116,35 @@ describe('Live Audio Transcription Types', () => {
});
});

describe('CoreError', () => {
it('should expose code and isTransient when wrapping a structured error', () => {
const cause = new Error('Command \'audio_stream_push\' failed: {"code":"BUSY","message":"Model busy","isTransient":true}');
const err = wrapCoreError('Push failed: ', cause);

expect(err).to.be.instanceOf(CoreError);
expect(err.name).to.equal('CoreError');
expect(err.code).to.equal('BUSY');
expect(err.isTransient).to.be.true;
expect(err.cause).to.equal(cause);
expect(err.message).to.contain('Push failed: ');
});

it('should default code to UNKNOWN and isTransient to false for unstructured errors', () => {
const cause = new Error('something exploded');
const err = wrapCoreError('Op failed: ', cause);

expect(err).to.be.instanceOf(CoreError);
expect(err.code).to.equal('UNKNOWN');
expect(err.isTransient).to.be.false;
});

it('should accept non-Error causes', () => {
const err = wrapCoreError('Op failed: ', 'string cause');
expect(err.code).to.equal('UNKNOWN');
expect(err.message).to.contain('string cause');
});
});

// --- E2E streaming test with synthetic PCM audio ---

describe('E2E with synthetic PCM audio', () => {
Expand Down
Loading