Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cbf465a
feat(sdk): stream file uploads and downloads instead of buffering in …
mishushakov Jun 12, 2026
9d72b2e
fix(js-sdk): return proper empty values from read() for empty files
mishushakov Jun 12, 2026
6cda03f
fix: address review comments on streaming reads
mishushakov Jun 12, 2026
58c5f86
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 15, 2026
1ae9a34
fix(python-sdk): prevent connection leaks from abandoned stream reads
mishushakov Jun 15, 2026
22d98a3
fix(js-sdk): release stream-read connections on error and GC
mishushakov Jun 17, 2026
9b34ecf
refactor(python-sdk): drop fragile async GC net from AsyncFileStreamR…
mishushakov Jun 17, 2026
a869488
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 17, 2026
a0d564f
fix(sdks): align streaming connection lifecycle across files and volumes
mishushakov Jun 17, 2026
e828699
feat(sdks): default file writes to octet-stream when data is streamable
mishushakov Jun 17, 2026
b0b1018
fix(python-sdk): give streamed file uploads the file-transfer timeout
mishushakov Jun 17, 2026
d2b7329
refactor(sdks): split volume streaming changes into a follow-up PR
mishushakov Jun 17, 2026
30ca0d0
feat(sdks): stream volume file uploads and downloads
mishushakov Jun 17, 2026
6a3ad1b
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 18, 2026
7a52f73
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 18, 2026
21045a2
test(python-sdk): make stream connection-leak assertions race-free
mishushakov Jun 18, 2026
eb12c66
fix(js-sdk): cancel underlying body reader when abandoned stream is GC'd
mishushakov Jun 18, 2026
ed277d0
refactor(sdks): replace stream GC nets with an idle-read timeout
mishushakov Jun 18, 2026
40a3103
feat(sdks): add per-chunk idle timeout to streamed reads and writes
mishushakov Jun 18, 2026
2022d3a
fix(python-sdk): run streamed-upload reads and gzip off the event loop
mishushakov Jun 18, 2026
cc2cd57
refactor(sdks): make the JS read idle timeout bound only the wire
mishushakov Jun 18, 2026
d2939b0
docs(python-sdk): scope the FILE_TIMEOUT comment to volume transfers
mishushakov Jun 18, 2026
8c5b640
docs(python-sdk): note envd backstops the streamed-upload per-write t…
mishushakov Jun 19, 2026
24f99d2
Merge file-upload: adopt idle-read-timeout streaming infra in volumes
mishushakov Jun 19, 2026
11254d8
refactor(js-sdk): use *Opts naming for volume option types
mishushakov Jun 19, 2026
b9039a2
Merge branch 'main' into mishushakov/stream-write-volumes
mishushakov Jun 22, 2026
90ae65f
fix(python-sdk): drop unused FILE_TIMEOUT from e2b.connection_config
mishushakov Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/volumes-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"e2b": patch
"@e2b/python-sdk": patch
---

Stream volume file uploads and downloads instead of buffering in memory:

- `Volume.writeFile()` / `Volume.write_file()`: `ReadableStream` data (JS, outside the browser) and file-like objects (Python) are now streamed to the API in chunks.
- `Volume.readFile(format="stream")` / `read_file(format="stream")`: the request timeout now bounds only the initial handshake instead of killing the stream while it's being consumed (Python disables the read timeout; JS bounds the handshake and supports `signal` to cancel an in-flight stream). A dropped connection during the stream handshake now surfaces the same typed, health-checked error as non-stream reads.
8 changes: 6 additions & 2 deletions packages/js-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,14 @@ export type {
VolumeInfo,
VolumeAndToken,
VolumeEntryStat,
VolumeMetadataOptions,
VolumeWriteOptions,
VolumeMetadataOpts,
VolumeReadOpts,
VolumeWriteOpts,
VolumeApiOpts,
VolumeConnectionConfig,
// Deprecated aliases, kept for backwards compatibility.
VolumeMetadataOptions,
VolumeWriteOptions,
} from './volume'

export { Sandbox }
Expand Down
113 changes: 92 additions & 21 deletions packages/js-sdk/src/volume/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@ import {
VolumeApiOpts,
FILE_TIMEOUT_MS,
} from './client'
import { ConnectionConfig, ConnectionOpts } from '../connectionConfig'
import {
ConnectionConfig,
ConnectionOpts,
setupRequestController,
wrapStreamWithConnectionCleanup,
} from '../connectionConfig'
import { NotFoundError, VolumeError } from '../errors'
import { toBlob } from '../utils'
import { runtime, toBlob } from '../utils'
import { VolumeFileType } from './types'
import type {
VolumeAndToken,
VolumeEntryStat,
VolumeInfo,
VolumeMetadataOptions,
VolumeWriteOptions,
VolumeMetadataOpts,
VolumeReadOpts,
VolumeWriteOpts,
} from './types'

/**
Expand Down Expand Up @@ -310,7 +316,7 @@ export class Volume {
*/
async makeDir(
path: string,
opts?: VolumeWriteOptions & VolumeApiOpts
opts?: VolumeWriteOpts & VolumeApiOpts
): Promise<VolumeEntryStat> {
const config = new VolumeConnectionConfig(this, opts)
const client = new VolumeApiClient(config)
Expand Down Expand Up @@ -425,7 +431,7 @@ export class Volume {
*/
async updateMetadata(
path: string,
metadata: VolumeMetadataOptions,
metadata: VolumeMetadataOpts,
opts?: VolumeApiOpts
): Promise<VolumeEntryStat> {
const config = new VolumeConnectionConfig(this, opts)
Expand Down Expand Up @@ -479,7 +485,7 @@ export class Volume {
*/
async readFile(
path: string,
opts?: VolumeApiOpts & { format?: 'text' }
opts?: VolumeReadOpts & { format?: 'text' }
): Promise<string>
/**
* Read file content as a `Uint8Array`.
Expand All @@ -494,7 +500,7 @@ export class Volume {
*/
async readFile(
path: string,
opts?: VolumeApiOpts & { format: 'bytes' }
opts?: VolumeReadOpts & { format: 'bytes' }
): Promise<Uint8Array>
/**
* Read file content as a `Blob`.
Expand All @@ -509,7 +515,7 @@ export class Volume {
*/
async readFile(
path: string,
opts?: VolumeApiOpts & { format: 'blob' }
opts?: VolumeReadOpts & { format: 'blob' }
): Promise<Blob>
/**
* Read file content as a `ReadableStream`.
Expand All @@ -524,11 +530,11 @@ export class Volume {
*/
async readFile(
path: string,
opts?: VolumeApiOpts & { format: 'stream' }
opts?: VolumeReadOpts & { format: 'stream' }
): Promise<ReadableStream<Uint8Array>>
async readFile(
path: string,
opts?: VolumeApiOpts & {
opts?: VolumeReadOpts & {
format?: 'text' | 'stream' | 'bytes' | 'blob'
}
): Promise<unknown> {
Expand All @@ -539,6 +545,60 @@ export class Volume {
})
const client = new VolumeApiClient(config)

if (format === 'stream') {
// The request timeout bounds only the initial handshake; once the
// response arrives, the stream lives until it's consumed, cancelled, the
// user signal aborts, or the per-chunk idle timeout fires. Matches the
// sandbox `files.read` stream path.
const { controller, clearStartTimeout, cleanup } = setupRequestController(
config.requestTimeoutMs,
opts?.signal
)

try {
const res = await client.api.GET('/volumecontent/{volumeID}/file', {
params: {
path: { volumeID: this.volumeId },
query: { path },
},
parseAs: 'stream',
signal: controller.signal,
})

if (res.response.status === 404) {
// Cancel the unconsumed body so the pooled connection is released
// before we propagate.
if (res.response.body && !res.response.bodyUsed) {
await res.response.body.cancel().catch(() => {})
}
cleanup()
throw new NotFoundError(`Path ${path} not found`)
}

const err = handleApiError(res, VolumeError)
if (err) {
if (res.response.body && !res.response.bodyUsed) {
await res.response.body.cancel().catch(() => {})
}
cleanup()
throw err
}

return wrapStreamWithConnectionCleanup(
res.data as ReadableStream<Uint8Array> | null,
{
clearStartTimeout,
cleanup,
controller,
idleTimeoutMs: opts?.streamIdleTimeoutMs ?? config.requestTimeoutMs,
}
)
Comment thread
cursor[bot] marked this conversation as resolved.
} catch (err) {
cleanup()
throw err
}
}

const res = await client.api.GET('/volumecontent/{volumeID}/file', {
params: {
path: {
Expand Down Expand Up @@ -572,11 +632,8 @@ export class Volume {
return typeof res.data === 'string' ? res.data : ''
}

if (format === 'blob') {
return res.data instanceof Blob ? res.data : new Blob([])
}

return res.data instanceof ReadableStream ? res.data : new Blob([]).stream()
// format === 'blob'
return res.data instanceof Blob ? res.data : new Blob([])
}

/**
Expand All @@ -587,7 +644,7 @@ export class Volume {
* Writing to a file that already exists overwrites the file.
*
* @param path path to the file.
* @param data data to write to the file. Data can be a string, `ArrayBuffer`, `Blob`, or `ReadableStream`.
* @param data data to write to the file. Data can be a string, `ArrayBuffer`, `Blob`, or `ReadableStream`. Outside the browser, `ReadableStream` data is streamed to the API instead of being buffered in memory.
* @param options file creation options.
* @param opts connection options.
*
Expand All @@ -596,15 +653,23 @@ export class Volume {
async writeFile(
path: string,
data: string | ArrayBuffer | Blob | ReadableStream<Uint8Array>,
opts?: VolumeWriteOptions & VolumeApiOpts
opts?: VolumeWriteOpts & VolumeApiOpts
): Promise<VolumeEntryStat> {
const config = new VolumeConnectionConfig(this, {
...opts,
requestTimeoutMs: opts?.requestTimeoutMs ?? FILE_TIMEOUT_MS,
})
const client = new VolumeApiClient(config)

const blob = await toBlob(data)
// Browsers don't support streaming request bodies, so buffer there.
const isStream = data instanceof ReadableStream && runtime !== 'browser'
const body = isStream ? data : await toBlob(data)

// A streamed upload carries no client-side timeout: the socket-write
// "wire" isn't observable through fetch, and a stalled producer is the
// caller's own code, so a stuck streamed upload is bounded server-side (or
// via `opts.signal`). Buffered uploads keep the normal request timeout.
const signal = isStream ? opts?.signal : config.getSignal()

const res = await client.api.PUT('/volumecontent/{volumeID}/file', {
params: {
Expand All @@ -619,12 +684,14 @@ export class Volume {
force: opts?.force,
},
},
bodySerializer: () => blob,
bodySerializer: () => body,
body: {} as any,
headers: {
'Content-Type': 'application/octet-stream',
},
signal: config.getSignal(),
signal,
// Streaming request bodies require half-duplex mode.
...(isStream && { duplex: 'half' as const }),
})

if (res.response.status === 404) {
Expand Down Expand Up @@ -682,6 +749,10 @@ export type {
VolumeInfo,
VolumeAndToken,
VolumeEntryStat,
VolumeMetadataOpts,
VolumeReadOpts,
VolumeWriteOpts,
// Deprecated aliases, kept for backwards compatibility.
VolumeMetadataOptions,
VolumeWriteOptions,
} from './types'
Expand Down
30 changes: 27 additions & 3 deletions packages/js-sdk/src/volume/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { VolumeApiComponents } from './client'
import { VolumeApiComponents, VolumeApiOpts } from './client'

/**
* File type enum.
Expand Down Expand Up @@ -66,7 +66,7 @@ export type VolumeEntryStat = Omit<
/**
* Options for updating file metadata.
*/
export type VolumeMetadataOptions = {
export type VolumeMetadataOpts = {
/**
* User ID of the file or directory.
*/
Expand All @@ -83,13 +83,37 @@ export type VolumeMetadataOptions = {
mode?: number
}

/**
* Options for reading files from a volume.
*/
export type VolumeReadOpts = VolumeApiOpts & {
/**
* Idle timeout for a streamed read (`format: 'stream'`) in **milliseconds**:
* abort if no chunk arrives from the server within this window *while
* reading*. It bounds only the wire — a slow or paused consumer never trips
* it (a consumer that holds the stream but stops reading is reclaimed
* server-side). Defaults to the request timeout; pass `0` to disable.
*/
streamIdleTimeoutMs?: number
}

/**
* Options for file and directory operations.
*/
export type VolumeWriteOptions = VolumeMetadataOptions & {
export type VolumeWriteOpts = VolumeMetadataOpts & {
/**
* For makeDir: Create parent directories if they don't exist.
* For writeFile: Force overwrite of an existing file.
*/
force?: boolean
}

/**
* @deprecated Use {@link VolumeMetadataOpts} instead.
*/
export type VolumeMetadataOptions = VolumeMetadataOpts

/**
* @deprecated Use {@link VolumeWriteOpts} instead.
*/
export type VolumeWriteOptions = VolumeWriteOpts
47 changes: 47 additions & 0 deletions packages/js-sdk/tests/sandbox/files/read.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,50 @@ sandboxTest('read empty file in all formats', async ({ sandbox }) => {
}
expect(Buffer.concat(chunks).length).toBe(0)
})

sandboxTest('read file as stream', async ({ sandbox }) => {
const filename = 'test_read_stream.txt'
const content = 'Streamed read content. '.repeat(10_000)

await sandbox.files.write(filename, content)
const stream = await sandbox.files.read(filename, { format: 'stream' })

const chunks: Uint8Array[] = []
for await (const chunk of stream as unknown as AsyncIterable<Uint8Array>) {
chunks.push(chunk)
}
const readContent = Buffer.concat(chunks).toString('utf-8')
assert.equal(readContent, content)
})

sandboxTest('read non-existing file as stream', async ({ sandbox }) => {
const filename = 'non_existing_file.txt'

await expect(
sandbox.files.read(filename, { format: 'stream' })
).rejects.toThrowError(FileNotFoundError)
})

sandboxTest('read empty file in all formats', async ({ sandbox }) => {
const filename = 'empty-file-formats.txt'
await sandbox.commands.run(`touch ${filename}`)

const text = await sandbox.files.read(filename, { format: 'text' })
expect(text).toBe('')

const bytes = await sandbox.files.read(filename, { format: 'bytes' })
expect(bytes).toBeInstanceOf(Uint8Array)
expect(bytes.length).toBe(0)

const blob = await sandbox.files.read(filename, { format: 'blob' })
expect(blob).toBeInstanceOf(Blob)
expect(blob.size).toBe(0)

const stream = await sandbox.files.read(filename, { format: 'stream' })
expect(stream).toBeInstanceOf(ReadableStream)
const chunks: Uint8Array[] = []
for await (const chunk of stream as unknown as AsyncIterable<Uint8Array>) {
chunks.push(chunk)
}
expect(Buffer.concat(chunks).length).toBe(0)
})
Comment thread
mishushakov marked this conversation as resolved.
Loading
Loading