diff --git a/.changeset/volumes-stream.md b/.changeset/volumes-stream.md new file mode 100644 index 0000000000..8c0d7445d6 --- /dev/null +++ b/.changeset/volumes-stream.md @@ -0,0 +1,9 @@ +--- +"e2b": patch +"@e2b/python-sdk": patch +--- + +Stream volume file uploads and downloads instead of buffering in memory: + +- `Volume.writeFile()` / `Volume.write_file()`: `ReadableStream` data (JS, outside the browser) and file-like objects (Python) are now streamed to the API in chunks. +- `Volume.readFile(format="stream")` / `read_file(format="stream")`: the request timeout now bounds only the initial handshake instead of killing the stream while it's being consumed (Python disables the read timeout; JS bounds the handshake and supports `signal` to cancel an in-flight stream). A dropped connection during the stream handshake now surfaces the same typed, health-checked error as non-stream reads. diff --git a/packages/js-sdk/src/index.ts b/packages/js-sdk/src/index.ts index c86a9b545c..c7a80f8fc8 100644 --- a/packages/js-sdk/src/index.ts +++ b/packages/js-sdk/src/index.ts @@ -110,10 +110,14 @@ export type { VolumeInfo, VolumeAndToken, VolumeEntryStat, - VolumeMetadataOptions, - VolumeWriteOptions, + VolumeMetadataOpts, + VolumeReadOpts, + VolumeWriteOpts, VolumeApiOpts, VolumeConnectionConfig, + // Deprecated aliases, kept for backwards compatibility. + VolumeMetadataOptions, + VolumeWriteOptions, } from './volume' export { Sandbox } diff --git a/packages/js-sdk/src/volume/index.ts b/packages/js-sdk/src/volume/index.ts index 7a09e74ec3..4bad00dd2d 100644 --- a/packages/js-sdk/src/volume/index.ts +++ b/packages/js-sdk/src/volume/index.ts @@ -6,16 +6,22 @@ import { VolumeApiOpts, FILE_TIMEOUT_MS, } from './client' -import { ConnectionConfig, ConnectionOpts } from '../connectionConfig' +import { + ConnectionConfig, + ConnectionOpts, + setupRequestController, + wrapStreamWithConnectionCleanup, +} from '../connectionConfig' import { NotFoundError, VolumeError } from '../errors' -import { toBlob } from '../utils' +import { runtime, toBlob } from '../utils' import { VolumeFileType } from './types' import type { VolumeAndToken, VolumeEntryStat, VolumeInfo, - VolumeMetadataOptions, - VolumeWriteOptions, + VolumeMetadataOpts, + VolumeReadOpts, + VolumeWriteOpts, } from './types' /** @@ -310,7 +316,7 @@ export class Volume { */ async makeDir( path: string, - opts?: VolumeWriteOptions & VolumeApiOpts + opts?: VolumeWriteOpts & VolumeApiOpts ): Promise { const config = new VolumeConnectionConfig(this, opts) const client = new VolumeApiClient(config) @@ -425,7 +431,7 @@ export class Volume { */ async updateMetadata( path: string, - metadata: VolumeMetadataOptions, + metadata: VolumeMetadataOpts, opts?: VolumeApiOpts ): Promise { const config = new VolumeConnectionConfig(this, opts) @@ -479,7 +485,7 @@ export class Volume { */ async readFile( path: string, - opts?: VolumeApiOpts & { format?: 'text' } + opts?: VolumeReadOpts & { format?: 'text' } ): Promise /** * Read file content as a `Uint8Array`. @@ -494,7 +500,7 @@ export class Volume { */ async readFile( path: string, - opts?: VolumeApiOpts & { format: 'bytes' } + opts?: VolumeReadOpts & { format: 'bytes' } ): Promise /** * Read file content as a `Blob`. @@ -509,7 +515,7 @@ export class Volume { */ async readFile( path: string, - opts?: VolumeApiOpts & { format: 'blob' } + opts?: VolumeReadOpts & { format: 'blob' } ): Promise /** * Read file content as a `ReadableStream`. @@ -524,11 +530,11 @@ export class Volume { */ async readFile( path: string, - opts?: VolumeApiOpts & { format: 'stream' } + opts?: VolumeReadOpts & { format: 'stream' } ): Promise> async readFile( path: string, - opts?: VolumeApiOpts & { + opts?: VolumeReadOpts & { format?: 'text' | 'stream' | 'bytes' | 'blob' } ): Promise { @@ -539,6 +545,60 @@ export class Volume { }) const client = new VolumeApiClient(config) + if (format === 'stream') { + // The request timeout bounds only the initial handshake; once the + // response arrives, the stream lives until it's consumed, cancelled, the + // user signal aborts, or the per-chunk idle timeout fires. Matches the + // sandbox `files.read` stream path. + const { controller, clearStartTimeout, cleanup } = setupRequestController( + config.requestTimeoutMs, + opts?.signal + ) + + try { + const res = await client.api.GET('/volumecontent/{volumeID}/file', { + params: { + path: { volumeID: this.volumeId }, + query: { path }, + }, + parseAs: 'stream', + signal: controller.signal, + }) + + if (res.response.status === 404) { + // Cancel the unconsumed body so the pooled connection is released + // before we propagate. + if (res.response.body && !res.response.bodyUsed) { + await res.response.body.cancel().catch(() => {}) + } + cleanup() + throw new NotFoundError(`Path ${path} not found`) + } + + const err = handleApiError(res, VolumeError) + if (err) { + if (res.response.body && !res.response.bodyUsed) { + await res.response.body.cancel().catch(() => {}) + } + cleanup() + throw err + } + + return wrapStreamWithConnectionCleanup( + res.data as ReadableStream | null, + { + clearStartTimeout, + cleanup, + controller, + idleTimeoutMs: opts?.streamIdleTimeoutMs ?? config.requestTimeoutMs, + } + ) + } catch (err) { + cleanup() + throw err + } + } + const res = await client.api.GET('/volumecontent/{volumeID}/file', { params: { path: { @@ -572,11 +632,8 @@ export class Volume { return typeof res.data === 'string' ? res.data : '' } - if (format === 'blob') { - return res.data instanceof Blob ? res.data : new Blob([]) - } - - return res.data instanceof ReadableStream ? res.data : new Blob([]).stream() + // format === 'blob' + return res.data instanceof Blob ? res.data : new Blob([]) } /** @@ -587,7 +644,7 @@ export class Volume { * Writing to a file that already exists overwrites the file. * * @param path path to the file. - * @param data data to write to the file. Data can be a string, `ArrayBuffer`, `Blob`, or `ReadableStream`. + * @param data data to write to the file. Data can be a string, `ArrayBuffer`, `Blob`, or `ReadableStream`. Outside the browser, `ReadableStream` data is streamed to the API instead of being buffered in memory. * @param options file creation options. * @param opts connection options. * @@ -596,7 +653,7 @@ export class Volume { async writeFile( path: string, data: string | ArrayBuffer | Blob | ReadableStream, - opts?: VolumeWriteOptions & VolumeApiOpts + opts?: VolumeWriteOpts & VolumeApiOpts ): Promise { const config = new VolumeConnectionConfig(this, { ...opts, @@ -604,7 +661,15 @@ export class Volume { }) const client = new VolumeApiClient(config) - const blob = await toBlob(data) + // Browsers don't support streaming request bodies, so buffer there. + const isStream = data instanceof ReadableStream && runtime !== 'browser' + const body = isStream ? data : await toBlob(data) + + // A streamed upload carries no client-side timeout: the socket-write + // "wire" isn't observable through fetch, and a stalled producer is the + // caller's own code, so a stuck streamed upload is bounded server-side (or + // via `opts.signal`). Buffered uploads keep the normal request timeout. + const signal = isStream ? opts?.signal : config.getSignal() const res = await client.api.PUT('/volumecontent/{volumeID}/file', { params: { @@ -619,12 +684,14 @@ export class Volume { force: opts?.force, }, }, - bodySerializer: () => blob, + bodySerializer: () => body, body: {} as any, headers: { 'Content-Type': 'application/octet-stream', }, - signal: config.getSignal(), + signal, + // Streaming request bodies require half-duplex mode. + ...(isStream && { duplex: 'half' as const }), }) if (res.response.status === 404) { @@ -682,6 +749,10 @@ export type { VolumeInfo, VolumeAndToken, VolumeEntryStat, + VolumeMetadataOpts, + VolumeReadOpts, + VolumeWriteOpts, + // Deprecated aliases, kept for backwards compatibility. VolumeMetadataOptions, VolumeWriteOptions, } from './types' diff --git a/packages/js-sdk/src/volume/types.ts b/packages/js-sdk/src/volume/types.ts index 7f5043ece0..e5a9526d2f 100644 --- a/packages/js-sdk/src/volume/types.ts +++ b/packages/js-sdk/src/volume/types.ts @@ -1,4 +1,4 @@ -import { VolumeApiComponents } from './client' +import { VolumeApiComponents, VolumeApiOpts } from './client' /** * File type enum. @@ -66,7 +66,7 @@ export type VolumeEntryStat = Omit< /** * Options for updating file metadata. */ -export type VolumeMetadataOptions = { +export type VolumeMetadataOpts = { /** * User ID of the file or directory. */ @@ -83,13 +83,37 @@ export type VolumeMetadataOptions = { mode?: number } +/** + * Options for reading files from a volume. + */ +export type VolumeReadOpts = VolumeApiOpts & { + /** + * Idle timeout for a streamed read (`format: 'stream'`) in **milliseconds**: + * abort if no chunk arrives from the server within this window *while + * reading*. It bounds only the wire — a slow or paused consumer never trips + * it (a consumer that holds the stream but stops reading is reclaimed + * server-side). Defaults to the request timeout; pass `0` to disable. + */ + streamIdleTimeoutMs?: number +} + /** * Options for file and directory operations. */ -export type VolumeWriteOptions = VolumeMetadataOptions & { +export type VolumeWriteOpts = VolumeMetadataOpts & { /** * For makeDir: Create parent directories if they don't exist. * For writeFile: Force overwrite of an existing file. */ force?: boolean } + +/** + * @deprecated Use {@link VolumeMetadataOpts} instead. + */ +export type VolumeMetadataOptions = VolumeMetadataOpts + +/** + * @deprecated Use {@link VolumeWriteOpts} instead. + */ +export type VolumeWriteOptions = VolumeWriteOpts diff --git a/packages/js-sdk/tests/sandbox/files/read.test.ts b/packages/js-sdk/tests/sandbox/files/read.test.ts index b4f481c310..85b8f45b4e 100644 --- a/packages/js-sdk/tests/sandbox/files/read.test.ts +++ b/packages/js-sdk/tests/sandbox/files/read.test.ts @@ -77,3 +77,50 @@ sandboxTest('read empty file in all formats', async ({ sandbox }) => { } expect(Buffer.concat(chunks).length).toBe(0) }) + +sandboxTest('read file as stream', async ({ sandbox }) => { + const filename = 'test_read_stream.txt' + const content = 'Streamed read content. '.repeat(10_000) + + await sandbox.files.write(filename, content) + const stream = await sandbox.files.read(filename, { format: 'stream' }) + + const chunks: Uint8Array[] = [] + for await (const chunk of stream as unknown as AsyncIterable) { + chunks.push(chunk) + } + const readContent = Buffer.concat(chunks).toString('utf-8') + assert.equal(readContent, content) +}) + +sandboxTest('read non-existing file as stream', async ({ sandbox }) => { + const filename = 'non_existing_file.txt' + + await expect( + sandbox.files.read(filename, { format: 'stream' }) + ).rejects.toThrowError(FileNotFoundError) +}) + +sandboxTest('read empty file in all formats', async ({ sandbox }) => { + const filename = 'empty-file-formats.txt' + await sandbox.commands.run(`touch ${filename}`) + + const text = await sandbox.files.read(filename, { format: 'text' }) + expect(text).toBe('') + + const bytes = await sandbox.files.read(filename, { format: 'bytes' }) + expect(bytes).toBeInstanceOf(Uint8Array) + expect(bytes.length).toBe(0) + + const blob = await sandbox.files.read(filename, { format: 'blob' }) + expect(blob).toBeInstanceOf(Blob) + expect(blob.size).toBe(0) + + const stream = await sandbox.files.read(filename, { format: 'stream' }) + expect(stream).toBeInstanceOf(ReadableStream) + const chunks: Uint8Array[] = [] + for await (const chunk of stream as unknown as AsyncIterable) { + chunks.push(chunk) + } + expect(Buffer.concat(chunks).length).toBe(0) +}) diff --git a/packages/js-sdk/tests/volume/file.test.ts b/packages/js-sdk/tests/volume/file.test.ts index f15131b7eb..5819b332be 100644 --- a/packages/js-sdk/tests/volume/file.test.ts +++ b/packages/js-sdk/tests/volume/file.test.ts @@ -58,6 +58,20 @@ describe('Volume File Operations', () => { } ) + volumeTest( + 'should write and read a file from a ReadableStream', + async ({ volume }) => { + const path = '/test-stream.txt' + const content = 'Test stream content' + const stream = new Blob([content]).stream() + + await volume.writeFile(path, stream) + const readContent = await volume.readFile(path, { format: 'text' }) + + expect(readContent).toBe(content) + } + ) + volumeTest('should write and read an empty file', async ({ volume }) => { const path = '/empty.txt' const content = '' @@ -68,6 +82,30 @@ describe('Volume File Operations', () => { expect(readContent).toBe(content) }) + volumeTest( + 'should read an empty file in all formats', + async ({ volume }) => { + const path = '/empty-formats.txt' + await volume.writeFile(path, '') + + const bytes = await volume.readFile(path, { format: 'bytes' }) + expect(bytes).toBeInstanceOf(Uint8Array) + expect(bytes.length).toBe(0) + + const blob = await volume.readFile(path, { format: 'blob' }) + expect(blob).toBeInstanceOf(Blob) + expect(blob.size).toBe(0) + + const stream = await volume.readFile(path, { format: 'stream' }) + expect(stream).toBeInstanceOf(ReadableStream) + const chunks: Uint8Array[] = [] + for await (const chunk of stream as unknown as AsyncIterable) { + chunks.push(chunk) + } + expect(chunks.reduce((n, c) => n + c.length, 0)).toBe(0) + } + ) + volumeTest( 'should overwrite an existing file with force option', async ({ volume }) => { diff --git a/packages/python-sdk/e2b/volume/connection_config.py b/packages/python-sdk/e2b/volume/connection_config.py index 4fc95367ed..a0f27ec733 100644 --- a/packages/python-sdk/e2b/volume/connection_config.py +++ b/packages/python-sdk/e2b/volume/connection_config.py @@ -8,6 +8,10 @@ from e2b.api.metadata import package_version REQUEST_TIMEOUT: float = 60.0 # 60 seconds + +# Timeout for volume file transfers, which stream large bodies and so must not +# inherit the short REQUEST_TIMEOUT. (Sandbox filesystem streaming instead +# bounds each chunk by the request timeout and leaves the total to the server.) FILE_TIMEOUT: float = 3600.0 # 1 hour diff --git a/packages/python-sdk/e2b/volume/volume_async.py b/packages/python-sdk/e2b/volume/volume_async.py index c8932af9af..3ced6a63ed 100644 --- a/packages/python-sdk/e2b/volume/volume_async.py +++ b/packages/python-sdk/e2b/volume/volume_async.py @@ -46,6 +46,7 @@ VolumeInfo, VolumeEntryStat, ) +from e2b.io_utils import aiter_io_chunks from e2b.volume.utils import DualMethod, convert_volume_entry_stat @@ -444,6 +445,7 @@ async def read_file( self, path: str, format: Literal["stream"], + stream_idle_timeout: Optional[float] = None, **opts: Unpack[VolumeApiParams], ) -> AsyncIterator[bytes]: ... @@ -451,6 +453,7 @@ async def read_file( self, path: str, format: Literal["text", "bytes", "stream"] = "text", + stream_idle_timeout: Optional[float] = None, **opts: Unpack[VolumeApiParams], ) -> Union[str, bytes, AsyncIterator[bytes]]: """ @@ -460,6 +463,11 @@ async def read_file( :param path: Path to the file :param format: Format of the file content—`text` by default + :param stream_idle_timeout: Idle timeout in **seconds** for a streamed + read (`format="stream"`)—abort if no chunk arrives within this + window while reading. Resets on every chunk, so it bounds a stalled + stream without limiting total transfer time. Defaults to the request + timeout; pass `0` to disable. :param opts: Connection options :return: File content as string, bytes, or async iterator of bytes @@ -473,13 +481,23 @@ async def read_file( ) if format == "stream": + # The request timeout bounds connection setup, not total transfer; + # consuming the body must not be killed by it. httpx's per-chunk + # `read` timeout becomes the idle-read timeout for the body + # (defaults to the request timeout), bounding a stalled stream + # without limiting total transfer time. Pass `0` to disable. + # Mirrors the sandbox files stream path. + idle_timeout = ( + timeout if stream_idle_timeout is None else stream_idle_timeout + ) + stream_timeout = httpx.Timeout(timeout, read=idle_timeout or None) async def stream_file() -> AsyncIterator[bytes]: async with api_client.get_async_httpx_client().stream( method="GET", url=f"/volumecontent/{self._volume_id}/file", params=params, - timeout=timeout, + timeout=stream_timeout, ) as response: if response.status_code == 404: raise NotFoundException(f"Path {path} not found") @@ -525,7 +543,7 @@ async def stream_file() -> AsyncIterator[bytes]: async def write_file( self, path: str, - data: Union[str, bytes, IO[bytes]], + data: Union[str, bytes, IO], uid: Optional[int] = None, gid: Optional[int] = None, mode: Optional[int] = None, @@ -539,7 +557,7 @@ async def write_file( Writing to a file that already exists overwrites the file. :param path: Path to the file - :param data: Data to write to the file. Data can be a string, bytes, or IO. + :param data: Data to write to the file. Data can be a string, bytes, or IO. File-like objects are streamed in chunks instead of being buffered in memory. :param uid: User ID of the created file :param gid: Group ID of the created file :param mode: Mode of the created file @@ -556,22 +574,21 @@ async def write_file( if upload_timeout is not None: api_client = api_client.with_timeout(httpx.Timeout(upload_timeout)) + content: Union[bytes, AsyncIterator[bytes]] if isinstance(data, str): - data_bytes = data.encode("utf-8") + content = data.encode("utf-8") elif isinstance(data, bytes): - data_bytes = data + content = data elif hasattr(data, "read"): - content = data.read() - if isinstance(content, bytes): - data_bytes = content - else: - data_bytes = content.encode("utf-8") + # Stream file-like objects in chunks without buffering them in + # memory. Async httpx requires an async iterable request body. + content = aiter_io_chunks(data) else: raise ValueError(f"Unsupported data type: {type(data)}") res = await put_file.asyncio_detailed( self._volume_id, - body=FilePayload(payload=data_bytes), # type: ignore[arg-type] # Pass bytes directly for async httpx compatibility + body=FilePayload(payload=content), # type: ignore[arg-type] # httpx accepts bytes and streamable content directly path=path, uid=uid if uid is not None else UNSET, gid=gid if gid is not None else UNSET, diff --git a/packages/python-sdk/e2b/volume/volume_sync.py b/packages/python-sdk/e2b/volume/volume_sync.py index 94a4f5be60..8bcf221858 100644 --- a/packages/python-sdk/e2b/volume/volume_sync.py +++ b/packages/python-sdk/e2b/volume/volume_sync.py @@ -1,3 +1,4 @@ +import io from typing import IO, Iterator, List, Literal, Optional, Union, cast, overload from http import HTTPStatus @@ -46,6 +47,7 @@ VolumeInfo, VolumeEntryStat, ) +from e2b.io_utils import iter_io_chunks from e2b.volume.utils import DualMethod, convert_volume_entry_stat @@ -442,6 +444,7 @@ def read_file( self, path: str, format: Literal["stream"], + stream_idle_timeout: Optional[float] = None, **opts: Unpack[VolumeApiParams], ) -> Iterator[bytes]: ... @@ -449,6 +452,7 @@ def read_file( self, path: str, format: Literal["text", "bytes", "stream"] = "text", + stream_idle_timeout: Optional[float] = None, **opts: Unpack[VolumeApiParams], ) -> Union[str, bytes, Iterator[bytes]]: """ @@ -458,6 +462,11 @@ def read_file( :param path: Path to the file :param format: Format of the file content—`text` by default + :param stream_idle_timeout: Idle timeout in **seconds** for a streamed + read (`format="stream"`)—abort if no chunk arrives within this + window while reading. Resets on every chunk, so it bounds a stalled + stream without limiting total transfer time. Defaults to the request + timeout; pass `0` to disable. :param opts: Connection options :return: File content as string, bytes, or iterator of bytes @@ -471,13 +480,23 @@ def read_file( ) if format == "stream": + # The request timeout bounds connection setup, not total transfer; + # consuming the body must not be killed by it. httpx's per-chunk + # `read` timeout becomes the idle-read timeout for the body + # (defaults to the request timeout), bounding a stalled stream + # without limiting total transfer time. Pass `0` to disable. + # Mirrors the sandbox files stream path. + idle_timeout = ( + timeout if stream_idle_timeout is None else stream_idle_timeout + ) + stream_timeout = httpx.Timeout(timeout, read=idle_timeout or None) def stream_file() -> Iterator[bytes]: with api_client.get_httpx_client().stream( method="GET", url=f"/volumecontent/{self._volume_id}/file", params=params, - timeout=timeout, + timeout=stream_timeout, ) as response: if response.status_code == 404: raise NotFoundException(f"Path {path} not found") @@ -522,7 +541,7 @@ def stream_file() -> Iterator[bytes]: def write_file( self, path: str, - data: Union[str, bytes, IO[bytes]], + data: Union[str, bytes, IO], uid: Optional[int] = None, gid: Optional[int] = None, mode: Optional[int] = None, @@ -536,7 +555,7 @@ def write_file( Writing to a file that already exists overwrites the file. :param path: Path to the file - :param data: Data to write to the file. Data can be a string, bytes, or IO. + :param data: Data to write to the file. Data can be a string, bytes, or IO. File-like objects are streamed in chunks instead of being buffered in memory. :param uid: User ID of the created file :param gid: Group ID of the created file :param mode: Mode of the created file @@ -553,22 +572,23 @@ def write_file( if upload_timeout is not None: api_client = api_client.with_timeout(httpx.Timeout(upload_timeout)) + content: Union[bytes, IO[bytes], Iterator[bytes]] if isinstance(data, str): - data_bytes = data.encode("utf-8") + content = data.encode("utf-8") elif isinstance(data, bytes): - data_bytes = data + content = data + elif isinstance(data, io.TextIOBase): + # Text-mode IO yields str chunks—encode them while streaming. + content = iter_io_chunks(data) elif hasattr(data, "read"): - content = data.read() - if isinstance(content, bytes): - data_bytes = content - else: - data_bytes = content.encode("utf-8") + # httpx streams file-like objects in chunks without buffering. + content = data else: raise ValueError(f"Unsupported data type: {type(data)}") res = put_file.sync_detailed( self._volume_id, - body=FilePayload(payload=data_bytes), # type: ignore[arg-type] # Pass bytes directly for sync httpx compatibility + body=FilePayload(payload=content), # type: ignore[arg-type] # httpx accepts bytes and streamable content directly path=path, uid=uid if uid is not None else UNSET, gid=gid if gid is not None else UNSET, diff --git a/packages/python-sdk/tests/async/volume_async/test_file.py b/packages/python-sdk/tests/async/volume_async/test_file.py index 014eb33a2c..f901dcb434 100644 --- a/packages/python-sdk/tests/async/volume_async/test_file.py +++ b/packages/python-sdk/tests/async/volume_async/test_file.py @@ -1,5 +1,5 @@ import datetime -from io import BytesIO +from io import BytesIO, StringIO import pytest @@ -50,6 +50,16 @@ async def test_write_and_read_stream(self, async_volume: AsyncVolume): assert read_content == content + async def test_write_and_read_text_stream(self, async_volume: AsyncVolume): + path = "/test-text-stream.txt" + content = "Test text stream content" + stream = StringIO(content) + + await async_volume.write_file(path, stream) + read_content = await async_volume.read_file(path, format="text") + + assert read_content == content + async def test_write_and_read_empty_file(self, async_volume: AsyncVolume): path = "/empty.txt" content = "" diff --git a/packages/python-sdk/tests/sync/volume_sync/test_file.py b/packages/python-sdk/tests/sync/volume_sync/test_file.py index c51db81691..eb2dfa0c0a 100644 --- a/packages/python-sdk/tests/sync/volume_sync/test_file.py +++ b/packages/python-sdk/tests/sync/volume_sync/test_file.py @@ -1,5 +1,5 @@ import datetime -from io import BytesIO +from io import BytesIO, StringIO import pytest @@ -47,6 +47,16 @@ def test_write_and_read_stream(self, volume: Volume): assert read_content == content + def test_write_and_read_text_stream(self, volume: Volume): + path = "/test-text-stream.txt" + content = "Test text stream content" + stream = StringIO(content) + + volume.write_file(path, stream) + read_content = volume.read_file(path, format="text") + + assert read_content == content + def test_write_and_read_empty_file(self, volume: Volume): path = "/empty.txt" content = ""