Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cbf465a
feat(sdk): stream file uploads and downloads instead of buffering in …
mishushakov Jun 12, 2026
9d72b2e
fix(js-sdk): return proper empty values from read() for empty files
mishushakov Jun 12, 2026
6cda03f
fix: address review comments on streaming reads
mishushakov Jun 12, 2026
58c5f86
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 15, 2026
1ae9a34
fix(python-sdk): prevent connection leaks from abandoned stream reads
mishushakov Jun 15, 2026
22d98a3
fix(js-sdk): release stream-read connections on error and GC
mishushakov Jun 17, 2026
9b34ecf
refactor(python-sdk): drop fragile async GC net from AsyncFileStreamR…
mishushakov Jun 17, 2026
a869488
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 17, 2026
a0d564f
fix(sdks): align streaming connection lifecycle across files and volumes
mishushakov Jun 17, 2026
e828699
feat(sdks): default file writes to octet-stream when data is streamable
mishushakov Jun 17, 2026
b0b1018
fix(python-sdk): give streamed file uploads the file-transfer timeout
mishushakov Jun 17, 2026
d2b7329
refactor(sdks): split volume streaming changes into a follow-up PR
mishushakov Jun 17, 2026
7a52f73
Merge remote-tracking branch 'origin/main' into mishushakov/stream-wr…
mishushakov Jun 18, 2026
21045a2
test(python-sdk): make stream connection-leak assertions race-free
mishushakov Jun 18, 2026
eb12c66
fix(js-sdk): cancel underlying body reader when abandoned stream is GC'd
mishushakov Jun 18, 2026
ed277d0
refactor(sdks): replace stream GC nets with an idle-read timeout
mishushakov Jun 18, 2026
40a3103
feat(sdks): add per-chunk idle timeout to streamed reads and writes
mishushakov Jun 18, 2026
2022d3a
fix(python-sdk): run streamed-upload reads and gzip off the event loop
mishushakov Jun 18, 2026
cc2cd57
refactor(sdks): make the JS read idle timeout bound only the wire
mishushakov Jun 18, 2026
d2939b0
docs(python-sdk): scope the FILE_TIMEOUT comment to volume transfers
mishushakov Jun 18, 2026
8c5b640
docs(python-sdk): note envd backstops the streamed-upload per-write t…
mishushakov Jun 19, 2026
b8db6b5
test(sdks): drop redundant streamed-read test coverage
mishushakov Jun 19, 2026
4351aae
refactor(python-sdk): drop the now-unused FILE_TIMEOUT constant
mishushakov Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/cuddly-pots-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"e2b": patch
"@e2b/python-sdk": patch
---

Stream uploads instead of buffering streaming input entirely in memory:

- `Sandbox.files.write()` / `write_files()`: `ReadableStream` data (JS, outside the browser) and file-like objects (Python) are streamed to the sandbox, including when `gzip` is enabled (compression now happens chunk by chunk). `useOctetStream`/`use_octet_stream` now defaults to auto-detect — octet-stream is used when any entry is streamable (so streamed uploads aren't silently buffered) and `multipart/form-data` otherwise; browsers always use `multipart/form-data`. Streamed uploads also use a longer transfer timeout instead of the default request timeout, so large uploads aren't cut off.
- `Sandbox.files.read(format="stream")`: the request timeout now bounds only the initial handshake instead of killing the stream while it's being consumed (Python disables the read timeout; JS bounds the handshake and supports `signal` to cancel an in-flight stream). A dropped connection during the stream handshake now surfaces the same typed, health-checked error as non-stream reads.
- Python `Sandbox.files.read(format="stream")`: the response body is now streamed from the sandbox instead of being downloaded into memory before iteration (sync and async).
- JS `Sandbox.files.read()` with `blob` or `stream` format now returns an empty `Blob`/`ReadableStream` for empty files instead of `""`.
78 changes: 78 additions & 0 deletions packages/js-sdk/src/connectionConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const supportedDomains = ['e2b.app', 'e2b.dev', 'e2b.pro', 'e2b-staging.dev']

export const REQUEST_TIMEOUT_MS = 60_000 // 60 seconds
export const DEFAULT_SANDBOX_TIMEOUT_MS = 300_000 // 300 seconds
// Default timeout for streaming file transfers (uploads/downloads). A streamed
// body can take far longer than a regular request, so it must not inherit the
// short `REQUEST_TIMEOUT_MS`.
export const FILE_TIMEOUT_MS = 3_600_000 // 1 hour
export const KEEPALIVE_PING_INTERVAL_SEC = 50 // 50 seconds

export const KEEPALIVE_PING_HEADER = 'Keepalive-Ping-Interval'
Expand Down Expand Up @@ -201,6 +205,80 @@ export function setupRequestController(
return { controller, clearStartTimeout, cleanup }
}

// GC safety net for streamed reads: if the consumer drops a streamed response
// body without reading it to completion or cancelling it, the registered
// cleanup releases the underlying connection when the stream is garbage
// collected. This mirrors the Python SDK's `weakref.finalize` on
// `FileStreamReader`. The held value is the cleanup function, which must not
// reference the stream itself or it would never be collected.
const streamReadFinalizers = new FinalizationRegistry<() => void>((cleanup) =>
cleanup()
)

/**
* Wrap a streaming response body so its pooled connection is released when the
* stream is fully read, cancelled, errors, or (as a GC safety net) abandoned.
*
* The request timeout configured via {@link setupRequestController} bounds only
* the initial handshake; this clears that timeout so consuming the body is not
* killed by it. Call once the handshake has succeeded (after error handling).
*
* @internal
*/
export function wrapStreamWithConnectionCleanup(
body: ReadableStream<Uint8Array> | null,
{
clearStartTimeout,
cleanup,
}: { clearStartTimeout: () => void; cleanup: () => void }
): ReadableStream<Uint8Array> {
clearStartTimeout()

if (!body) {
cleanup()
return new Blob([]).stream()
}

const reader = body.getReader()
const unregisterToken = {}
// Detach the GC finalizer and release the connection. Idempotent via
// `cleanup`, so it's safe to call from multiple stream callbacks.
const release = () => {
streamReadFinalizers.unregister(unregisterToken)
cleanup()
}

const stream = new ReadableStream<Uint8Array>({
async pull(streamController) {
try {
const { done, value } = await reader.read()
if (done) {
streamController.close()
release()
} else {
streamController.enqueue(value)
}
} catch (err) {
release()
streamController.error(err)
}
},
async cancel(reason) {
try {
await reader.cancel(reason)
} finally {
release()
}
},
})

// Release the connection if the consumer abandons the stream without
// reading it to completion or cancelling it.
streamReadFinalizers.register(stream, cleanup, unregisterToken)
Comment thread
mishushakov marked this conversation as resolved.
Outdated

return stream
}

function buildUserAgent(integration?: string) {
const userAgentParts = [`e2b-js-sdk/${version}`]

Expand Down
114 changes: 101 additions & 13 deletions packages/js-sdk/src/sandbox/filesystem/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import {
ConnectionConfig,
ConnectionOpts,
defaultUsername,
FILE_TIMEOUT_MS,
KEEPALIVE_PING_HEADER,
KEEPALIVE_PING_INTERVAL_SEC,
setupRequestController,
Username,
wrapStreamWithConnectionCleanup,
} from '../../connectionConfig'

import {
Expand Down Expand Up @@ -51,7 +53,7 @@ import {
InvalidArgumentError,
TemplateError,
} from '../../errors'
import { toBlob, toUploadBody } from '../../utils'
import { runtime, toBlob, toUploadBody } from '../../utils'

const FILESYSTEM_HTTP_ERROR_MAP: Record<number, (message: string) => Error> = {
404: (message: string) => new FileNotFoundError(message),
Expand Down Expand Up @@ -271,9 +273,15 @@ export interface FilesystemWriteOpts extends FilesystemRequestOpts {
gzip?: boolean
/**
* When true, the upload uses `application/octet-stream` instead of `multipart/form-data`.
* Outside the browser, `ReadableStream` data is then streamed to the sandbox
* instead of being buffered in memory.
*
* Defaults to `false`. Requires envd 0.5.7 or later — when not supported by
* the sandbox's envd version, the upload falls back to `multipart/form-data`.
* Defaults to `undefined`, which uses octet-stream when any entry is a
* `ReadableStream` (so streamed uploads aren't buffered) and
* `multipart/form-data` otherwise; browsers always use `multipart/form-data`
* since they can't stream request bodies. Requires envd 0.5.7 or later — when
* not supported by the sandbox's envd version, the upload falls back to
* `multipart/form-data`.
*/
useOctetStream?: boolean
/**
Expand Down Expand Up @@ -412,6 +420,10 @@ export class Filesystem {
*
* You can pass `text`, `bytes`, `blob`, or `stream` to `opts.format` to change the return type.
*
* The request timeout bounds only the initial handshake—the returned
* stream is not killed by it while being consumed. Use `opts.signal` to
* cancel an in-flight stream.
*
* @param path path to the file.
* @param opts connection options.
* @param [opts.format] format of the file content—`stream`.
Expand Down Expand Up @@ -443,6 +455,58 @@ export class Filesystem {
headers['Accept-Encoding'] = 'gzip'
}

if (format === 'stream') {
// The request timeout bounds only the initial handshake; once the
// response arrives, the stream lives until it's consumed, cancelled,
// or the user signal aborts.
const { controller, clearStartTimeout, cleanup } = setupRequestController(
opts?.requestTimeoutMs ?? this.connectionConfig.requestTimeoutMs,
opts?.signal
)

try {
const res = await this.envdApi.api
.GET('/files', {
params: {
query: {
path,
username: user,
},
},
parseAs: 'stream',
signal: controller.signal,
headers,
})
.catch(async (err) => {
// Map a dropped connection during the handshake (e.g. killed
// sandbox) to a typed error via the health check, matching the
// non-stream read path below.
throw await handleEnvdApiFetchError(err, this.checkHealth)
})

const err = await handleFilesystemEnvdApiError(res)
if (err) {
// Cancel the unconsumed error body so the pooled connection is
// released before we propagate, matching the Python stream path's
// `r.close()`. `cleanup()`'s abort would also release it, but
// cancelling is explicit and independent of runtime abort semantics.
if (res.response.body && !res.response.bodyUsed) {
await res.response.body.cancel().catch(() => {})
}
cleanup()
throw err
}

return wrapStreamWithConnectionCleanup(
res.data as ReadableStream<Uint8Array> | null,
{ clearStartTimeout, cleanup }
)
} catch (err) {
cleanup()
throw err
Comment thread
cursor[bot] marked this conversation as resolved.
}
}

const res = await this.envdApi.api
.GET('/files', {
params: {
Expand All @@ -467,13 +531,17 @@ export class Filesystem {
throw err
}

if (format === 'bytes') {
return new Uint8Array(res.data as ArrayBuffer)
// When the file is empty, the response body is skipped and `res.data` is
// `undefined`. Return the proper empty value for the requested format.
if (res.response.headers.get('content-length') === '0') {
if (format === 'bytes') {
return new Uint8Array(0)
}
return format === 'blob' ? new Blob([]) : ''
Comment thread
cursor[bot] marked this conversation as resolved.
}

// When the file is empty, res.data is parsed as `{}`. This is a workaround to return an empty string.
if (res.response.headers.get('content-length') === '0') {
return ''
if (format === 'bytes') {
return new Uint8Array(res.data as ArrayBuffer)
}

return res.data
Expand Down Expand Up @@ -559,11 +627,18 @@ export class Filesystem {

const supportsOctetStream =
compareVersions(this.envdApi.version, ENVD_OCTET_STREAM_UPLOAD) >= 0
// Gzip compression only works with the octet-stream upload (the
// Content-Encoding header applies to the whole request body), so
// requesting gzip implies it when envd supports it.
// Streaming a request body only happens on the octet-stream path; the
// multipart path buffers via `toBlob`. So default to octet-stream when any
// entry is a `ReadableStream`, otherwise a streamed upload would be
// silently buffered. Browsers can't stream request bodies, so they stay on
// multipart. Gzip also implies octet-stream (the Content-Encoding header
// applies to the whole request body). An explicit `useOctetStream` wins.
const hasStreamableData =
runtime !== 'browser' &&
writeFiles.some((file) => file.data instanceof ReadableStream)
const useOctetStream =
((writeOpts?.useOctetStream ?? false) || useGzip) && supportsOctetStream
((writeOpts?.useOctetStream ?? hasStreamableData) || useGzip) &&
supportsOctetStream

const metadata = writeOpts?.metadata
validateMetadata(metadata)
Expand Down Expand Up @@ -593,6 +668,15 @@ export class Filesystem {
writeFiles.map(async (file) => {
const filePath = path ?? (file as WriteEntry).path
const body = await toUploadBody(file.data, useGzip)
const isStream = body instanceof ReadableStream
// A streamed upload can take far longer than the 60s request default,
// so fall back to the file-transfer timeout (matching volume writes)
// unless the caller set one explicitly. The signal is a total
// deadline—unlike downloads there's no post-handshake point to clear
// it, since the response only arrives once the body has been sent.
const uploadTimeoutMs =
writeOpts?.requestTimeoutMs ??
(isStream ? FILE_TIMEOUT_MS : undefined)

const res = await this.envdApi.api
.POST('/files', {
Expand All @@ -605,10 +689,14 @@ export class Filesystem {
bodySerializer: () => body,
headers,
signal: this.connectionConfig.getSignal(
writeOpts?.requestTimeoutMs,
uploadTimeoutMs,
writeOpts?.signal
),
body: {},
// Streaming request bodies require half-duplex mode.
...(isStream && {
duplex: 'half' as const,
}),
})
.catch(async (err) => {
throw await handleEnvdApiFetchError(err, this.checkHealth)
Expand Down
12 changes: 10 additions & 2 deletions packages/js-sdk/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ export function shellQuote(s: string): string {

/**
* Prepare data for upload as a BodyInit, optionally gzip-compressed.
* When gzip is enabled, compresses the data and returns a Blob.
*
* Outside the browser, streams (and gzip-compressed data) are returned as
* `ReadableStream` so they can be uploaded without buffering in memory.
* Browsers don't support streaming request bodies, so data is buffered into
* a Blob there.
*/
export async function toUploadBody(
data: string | ArrayBuffer | Blob | ReadableStream,
Expand All @@ -159,7 +163,11 @@ export async function toUploadBody(
? data.stream()
: new Blob([data]).stream()
const compressed = stream.pipeThrough(new CompressionStream('gzip'))
return new Response(compressed).blob()
return runtime === 'browser' ? new Response(compressed).blob() : compressed
}

if (data instanceof ReadableStream && runtime !== 'browser') {
return data
}

return toBlob(data)
Expand Down
47 changes: 47 additions & 0 deletions packages/js-sdk/tests/sandbox/files/read.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,50 @@ sandboxTest('empty file', async ({ sandbox }) => {
const content = await sandbox.files.read(filename)
expect(content).toBe('')
})

sandboxTest('read file as stream', async ({ sandbox }) => {
const filename = 'test_read_stream.txt'
const content = 'Streamed read content. '.repeat(10_000)

await sandbox.files.write(filename, content)
const stream = await sandbox.files.read(filename, { format: 'stream' })

const chunks: Uint8Array[] = []
for await (const chunk of stream as unknown as AsyncIterable<Uint8Array>) {
chunks.push(chunk)
}
const readContent = Buffer.concat(chunks).toString('utf-8')
assert.equal(readContent, content)
})

sandboxTest('read non-existing file as stream', async ({ sandbox }) => {
const filename = 'non_existing_file.txt'

await expect(
sandbox.files.read(filename, { format: 'stream' })
).rejects.toThrowError(FileNotFoundError)
})

sandboxTest('read empty file in all formats', async ({ sandbox }) => {
const filename = 'empty-file-formats.txt'
await sandbox.commands.run(`touch ${filename}`)

const text = await sandbox.files.read(filename, { format: 'text' })
expect(text).toBe('')

const bytes = await sandbox.files.read(filename, { format: 'bytes' })
expect(bytes).toBeInstanceOf(Uint8Array)
expect(bytes.length).toBe(0)

const blob = await sandbox.files.read(filename, { format: 'blob' })
expect(blob).toBeInstanceOf(Blob)
expect(blob.size).toBe(0)

const stream = await sandbox.files.read(filename, { format: 'stream' })
expect(stream).toBeInstanceOf(ReadableStream)
const chunks: Uint8Array[] = []
for await (const chunk of stream as unknown as AsyncIterable<Uint8Array>) {
chunks.push(chunk)
}
expect(Buffer.concat(chunks).length).toBe(0)
})
Loading
Loading