Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions hail/python/hailtop/aiocloud/aioaws/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ def __str__(self) -> str:


class S3AsyncFS(AsyncFS):
"""Async filesystem implementation for Amazon S3.

The boto3 S3 client is synchronous, so this implementation runs blocking
S3 operations in a thread pool while exposing the shared :class:`AsyncFS`
coroutine interface for ``s3://`` URLs.
"""

def __init__(
self,
thread_pool: Optional[ThreadPoolExecutor] = None,
Expand Down
8 changes: 8 additions & 0 deletions hail/python/hailtop/aiocloud/aioazure/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ async def wrapped(self: 'AzureAsyncFS', url, *args, **kwargs):


class AzureAsyncFS(AsyncFS):
"""Async filesystem implementation for Azure Blob Storage.

This implementation accepts
``https://<account>.blob.core.windows.net/<container>/<path>`` URLs,
including URLs with SAS token query strings. It owns Azure Blob service
clients keyed by account, container, and credential context.
"""

PATH_REGEX = re.compile('/(?P<container>[^/]+)(?P<name>.*)')

def __init__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,15 @@ def __str__(self) -> str:


class GoogleStorageAsyncFS(AsyncFS):
"""Async filesystem implementation for Google Cloud Storage.

This implementation handles ``gs://`` URLs, uses
:class:`GoogleStorageClient` for metadata and object operations, and
supports requester-pays configuration through the client passed at
construction time. ``bucket_allow_list`` records buckets already known to
be acceptable hot-storage locations.
"""

def __init__(
self,
*,
Expand Down
38 changes: 38 additions & 0 deletions hail/python/hailtop/aiotools/fs/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,27 @@ def __truediv__(self, part: str) -> 'AsyncFSURL':


class AsyncFS(abc.ABC):
"""Abstract asynchronous filesystem interface.

Implementations expose URL-addressed file operations for one storage
backend. Public methods accept URL strings in the schemes returned by
:meth:`schemes`; they return byte streams, status objects, directory
entries, or booleans without interpreting file contents.

Filesystem instances may own network clients or thread pools, so callers
should either close them explicitly or use them as async context managers.
The public :meth:`open_from` method handles common zero-length reads and
delegates non-empty ranged reads to the implementation-private
:meth:`_open_from`.
"""

FILE = "file"
DIR = "dir"

@staticmethod
@abc.abstractmethod
def schemes() -> Set[str]:
"""Return URL schemes accepted by this filesystem."""
pass

@staticmethod
Expand All @@ -269,18 +284,22 @@ def copy_part_size(url: str) -> int: # pylint: disable=unused-argument
@staticmethod
@abc.abstractmethod
def valid_url(url: str) -> bool:
"""Return whether ``url`` can be handled by this filesystem."""
pass

@staticmethod
@abc.abstractmethod
def parse_url(url: str, *, error_if_bucket: bool = False) -> AsyncFSURL:
"""Parse ``url`` into a filesystem-specific URL object."""
pass

@abc.abstractmethod
async def open(self, url: str) -> ReadableStream:
"""Open ``url`` for reading from the beginning."""
pass

async def open_from(self, url: str, start: int, *, length: Optional[int] = None) -> ReadableStream:
"""Open ``url`` for reading at ``start`` with an optional byte length."""
if length == 0:
fs_url = self.parse_url(url)
if fs_url.path.endswith("/"):
Expand All @@ -301,36 +320,44 @@ async def open_from(self, url: str, start: int, *, length: Optional[int] = None)

@abc.abstractmethod
async def _open_from(self, url: str, start: int, *, length: Optional[int] = None) -> ReadableStream:
"""Implementation hook for non-empty ranged reads."""
pass

@abc.abstractmethod
async def create(self, url: str, *, retry_writes: bool = True) -> AsyncContextManager[WritableStream]:
"""Open ``url`` for writing, replacing any existing object."""
pass

@abc.abstractmethod
async def multi_part_create(self, sema: asyncio.Semaphore, url: str, num_parts: int) -> MultiPartCreate:
"""Create ``url`` by writing ``num_parts`` independent byte ranges."""
pass

@abc.abstractmethod
async def mkdir(self, url: str) -> None:
"""Create the directory represented by ``url`` when supported."""
pass

@abc.abstractmethod
async def makedirs(self, url: str, exist_ok: bool = False) -> None:
"""Create the directory represented by ``url`` and any missing parents."""
pass

@abc.abstractmethod
async def statfile(self, url: str) -> FileStatus:
"""Return status for the file represented by ``url``."""
pass

@abc.abstractmethod
async def listfiles(
self, url: str, recursive: bool = False, exclude_trailing_slash_files: bool = True
) -> AsyncIterator[FileListEntry]:
"""List entries under the directory represented by ``url``."""
pass

@abc.abstractmethod
async def staturl(self, url: str) -> str:
"""Return :attr:`FILE` or :attr:`DIR` for ``url``."""
pass

async def _staturl_parallel_isfile_isdir(self, url: str) -> str:
Expand All @@ -357,14 +384,17 @@ async def _staturl_parallel_isfile_isdir(self, url: str) -> str:

@abc.abstractmethod
async def isfile(self, url: str) -> bool:
"""Return whether ``url`` exists as a file."""
pass

@abc.abstractmethod
async def isdir(self, url: str) -> bool:
"""Return whether ``url`` exists as a directory."""
pass

@abc.abstractmethod
async def remove(self, url: str) -> None:
"""Remove the file represented by ``url``."""
pass

async def _remove_doesnt_exist_ok(self, url):
Expand All @@ -376,6 +406,7 @@ async def _remove_doesnt_exist_ok(self, url):
async def rmtree(
self, sema: Optional[asyncio.Semaphore], url: str, listener: Optional[Callable[[int], None]] = None
) -> None:
"""Remove the directory tree represented by ``url`` if it exists."""
if listener is None:
listener = lambda _: None
if sema is None:
Expand All @@ -398,37 +429,44 @@ async def rm(entry: FileListEntry):
await pool.wait(tasks)

async def touch(self, url: str) -> None:
"""Create an empty file at ``url`` or replace an existing file."""
async with await self.create(url):
pass

async def read(self, url: str) -> bytes:
"""Read the complete contents of ``url`` into memory."""
async with await self.open(url) as f:
return await f.read()

async def read_from(self, url: str, start: int) -> bytes:
"""Read from ``url`` starting at byte offset ``start``."""
async with await self.open_from(url, start) as f:
return await f.read()

async def read_range(self, url: str, start: int, end: int, *, end_inclusive=True) -> bytes:
"""Read bytes from ``start`` through ``end`` from ``url``."""
n = (end - start) + bool(end_inclusive)
async with await self.open_from(url, start, length=n) as f:
return await f.readexactly(n)

async def write(self, url: str, data: bytes) -> None:
"""Write ``data`` to ``url``, replacing any existing file."""
async def _write() -> None:
async with await self.create(url, retry_writes=False) as f:
await f.write(data)

await retry_transient_errors(_write)

async def exists(self, url: str) -> bool:
"""Return whether ``url`` exists as a file."""
try:
await self.statfile(url)
except FileNotFoundError:
return False
return True

async def close(self) -> None:
"""Release resources owned by this filesystem."""
pass

async def __aenter__(self) -> Self:
Expand Down
8 changes: 8 additions & 0 deletions hail/python/hailtop/aiotools/router_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@


class RouterAsyncFS(AsyncFS):
"""Route each URL to the concrete :class:`AsyncFS` for its scheme.

RouterAsyncFS supports local paths, Google Cloud Storage, Terra Azure,
Azure Blob Storage, and Amazon S3. Backend filesystem instances are created
lazily from the corresponding ``*_kwargs`` and owned by the router; closing
the router closes every backend that was used.
"""

FS_CLASSES: ClassVar[List[type[AsyncFS]]] = [
LocalAsyncFS,
aiogoogle.GoogleStorageAsyncFS,
Expand Down