diff --git a/batch/pinned-requirements.txt b/batch/pinned-requirements.txt index 8d8c2bdb7e2..38becf37fbb 100644 --- a/batch/pinned-requirements.txt +++ b/batch/pinned-requirements.txt @@ -50,7 +50,7 @@ frozenlist==1.8.0 # -c batch/../web_common/pinned-requirements.txt # aiohttp # aiosignal -idna==3.11 +idna==3.12 # via # -c batch/../gear/pinned-requirements.txt # -c batch/../hail/python/dev/pinned-requirements.txt @@ -70,7 +70,7 @@ numpy==2.2.6 # -c batch/../hail/python/dev/pinned-requirements.txt # -c batch/../hail/python/pinned-requirements.txt # pandas -packaging==26.0 +packaging==26.1 # via # -c batch/../gear/pinned-requirements.txt # -c batch/../hail/python/dev/pinned-requirements.txt diff --git a/ci/pinned-requirements.txt b/ci/pinned-requirements.txt index 6e7aad414d3..4603859551a 100644 --- a/ci/pinned-requirements.txt +++ b/ci/pinned-requirements.txt @@ -32,7 +32,7 @@ distro==1.9.0 # via zulip gidgethub==5.4.0 # via -r ci/requirements.txt -idna==3.11 +idna==3.12 # via # -c ci/../gear/pinned-requirements.txt # -c ci/../hail/python/dev/pinned-requirements.txt diff --git a/gear/pinned-requirements.txt b/gear/pinned-requirements.txt index 45aca9efb72..a974d667cb1 100644 --- a/gear/pinned-requirements.txt +++ b/gear/pinned-requirements.txt @@ -67,7 +67,10 @@ frozenlist==1.8.0 # aiohttp # aiosignal google-api-core==2.30.3 - # via google-api-python-client + # via + # -c gear/../hail/python/hailtop/pinned-requirements.txt + # -c gear/../hail/python/pinned-requirements.txt + # google-api-python-client google-api-python-client==2.194.0 # via google-cloud-profiler google-auth==2.49.2 @@ -85,12 +88,15 @@ google-auth-httplib2==0.3.1 google-cloud-profiler==4.1.0 # via -r gear/requirements.txt googleapis-common-protos==1.74.0 - # via google-api-core + # via + # -c gear/../hail/python/hailtop/pinned-requirements.txt + # -c gear/../hail/python/pinned-requirements.txt + # google-api-core httplib2==0.31.2 # via # google-api-python-client # google-auth-httplib2 -idna==3.11 +idna==3.12 # via # -c gear/../hail/python/dev/pinned-requirements.txt # -c gear/../hail/python/hailtop/pinned-requirements.txt @@ -106,7 +112,7 @@ multidict==6.7.1 # -c gear/../hail/python/pinned-requirements.txt # aiohttp # yarl -packaging==26.0 +packaging==26.1 # via # -c gear/../hail/python/dev/pinned-requirements.txt # -c gear/../hail/python/pinned-requirements.txt @@ -126,9 +132,14 @@ propcache==0.4.1 # aiohttp # yarl proto-plus==1.27.2 - # via google-api-core + # via + # -c gear/../hail/python/hailtop/pinned-requirements.txt + # -c gear/../hail/python/pinned-requirements.txt + # google-api-core protobuf==7.34.1 # via + # -c gear/../hail/python/hailtop/pinned-requirements.txt + # -c gear/../hail/python/pinned-requirements.txt # google-api-core # google-cloud-profiler # googleapis-common-protos diff --git a/hail/python/dev/pinned-requirements.txt b/hail/python/dev/pinned-requirements.txt index 1bd42708948..5f24cfa78c0 100644 --- a/hail/python/dev/pinned-requirements.txt +++ b/hail/python/dev/pinned-requirements.txt @@ -128,7 +128,7 @@ executing==2.2.1 # stack-data fastjsonschema==2.21.2 # via nbformat -filelock==3.25.2 +filelock==3.29.0 # via # python-discovery # virtualenv @@ -149,9 +149,9 @@ httpcore==1.0.9 # via httpx httpx==0.28.1 # via jupyterlab -identify==2.6.18 +identify==2.6.19 # via pre-commit -idna==3.11 +idna==3.12 # via # -c hail/python/dev/../pinned-requirements.txt # anyio @@ -232,7 +232,7 @@ jupyter-core==5.9.1 # nbconvert # nbformat # notebook -jupyter-events==0.12.0 +jupyter-events==0.12.1 # via jupyter-server jupyter-lsp==2.3.1 # via jupyterlab @@ -309,7 +309,7 @@ nodeenv==1.10.0 # via # pre-commit # pyright -nodejs-wheel-binaries==24.14.1 +nodejs-wheel-binaries==24.15.0 # via pyright notebook==6.5.7 # via @@ -326,7 +326,7 @@ numpy==2.2.6 # matplotlib overrides==7.7.0 # via jupyter-server -packaging==26.0 +packaging==26.1 # via # -c hail/python/dev/../pinned-requirements.txt # build @@ -631,7 +631,7 @@ uv==0.10.12 # via -r hail/python/dev/requirements.txt uv-build==0.10.12 # via -r hail/python/dev/requirements.txt -virtualenv==21.2.1 +virtualenv==21.2.4 # via pre-commit watchfiles==1.1.1 # via aiohttp-devtools @@ -651,5 +651,5 @@ yarl==1.23.0 # via # -c hail/python/dev/../pinned-requirements.txt # aiohttp -zipp==3.23.0 +zipp==3.23.1 # via importlib-metadata diff --git a/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py b/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py index 68526ea12cc..36fa8f04739 100644 --- a/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py +++ b/hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py @@ -3,15 +3,39 @@ import logging import os import urllib.parse +from concurrent.futures.thread import ThreadPoolExecutor from contextlib import AsyncExitStack from types import TracebackType -from typing import Any, AsyncIterator, Callable, Coroutine, Dict, List, MutableMapping, Optional, Set, Tuple, Type, cast +from typing import ( + Any, + AsyncIterator, + Callable, + Coroutine, + Dict, + List, + MutableMapping, + Optional, + Set, + Tuple, + Type, + cast, +) import aiohttp +from google.auth.aio.credentials import AnonymousCredentials +from google.cloud.storage import Client, transfer_manager +from google.oauth2 import service_account +from google.oauth2.credentials import Credentials from multidict import CIMultiDictProxy # pylint: disable=unused-import # pylint: disable=unused-import from hailtop import timex -from hailtop.aiotools import FeedableAsyncIterable, WriteBuffer +from hailtop.aiocloud.common import AnonymousCloudCredentials +from hailtop.aiotools import ( + FeedableAsyncIterable, + LocalAsyncFS, + WeightedSemaphore, + WriteBuffer, +) from hailtop.aiotools.fs import ( AsyncFS, AsyncFSFactory, @@ -25,10 +49,17 @@ UnexpectedEOFError, WritableStream, ) -from hailtop.utils import OnlineBoundedGather2, TransientError, retry_transient_errors, secret_alnum_string +from hailtop.utils import ( + OnlineBoundedGather2, + TransientError, + async_to_blocking, + blocking_to_async, + retry_transient_errors, + secret_alnum_string, +) from ...common.session import BaseSession -from ..credentials import GoogleCredentials +from ..credentials import GoogleCredentials, GoogleServiceAccountCredentials from ..user_config import GCSRequesterPaysConfiguration, get_gcs_requester_pays_configuration from .base_client import GoogleBaseClient @@ -315,14 +346,47 @@ async def _wait_closed(self) -> None: class GoogleStorageClient(GoogleBaseClient): - def __init__(self, gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None, **kwargs): + CHUNK_SIZE = 8 * 1024 * 1024 + MAX_WORKERS = 8 + + def __init__( + self, + gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None, + thread_pool: Optional[ThreadPoolExecutor] = None, + **kwargs, + ): if 'timeout' not in kwargs and 'http_session' not in kwargs: # Around May 2022, GCS started timing out a lot with our default 5s timeout kwargs['timeout'] = aiohttp.ClientTimeout(total=20) + + timeout = kwargs.get('timeout') + if isinstance(timeout, aiohttp.ClientTimeout): + self._timeout = timeout.total + elif isinstance(timeout, (int, float)): + self._timeout = timeout + else: + self._timeout = 20 + + if not thread_pool: + thread_pool = ThreadPoolExecutor() + self._thread_pool = thread_pool + super().__init__('https://storage.googleapis.com/storage/v1', **kwargs) self._gcs_requester_pays_configuration = get_gcs_requester_pays_configuration( gcs_requester_pays_configuration=gcs_requester_pays_configuration ) + credentials = kwargs.get('credentials') + if isinstance(credentials, GoogleServiceAccountCredentials): + gcp_credentials = service_account.Credentials.from_service_account_info(info=credentials.key) + elif isinstance(credentials, GoogleCredentials): + access_token = async_to_blocking(credentials.access_token()) + gcp_credentials = Credentials(token=access_token) + elif isinstance(credentials, AnonymousCloudCredentials): + gcp_credentials = AnonymousCredentials() + else: + gcp_credentials = None + + self._client = Client(credentials=gcp_credentials) async def bucket_info(self, bucket: str) -> Dict[str, Any]: """ @@ -415,6 +479,44 @@ async def list_objects(self, bucket: str, **kwargs) -> PageIterator: self._update_params_with_user_project(kwargs, bucket) return PageIterator(self, f'/b/{bucket}/o', kwargs) + async def download_single_file(self, bucket: str, filename: str, dest: str) -> None: + user_project = self._get_user_project_for_bucket(bucket) + bucket_instance = self._client.bucket(bucket, user_project=user_project) + blob = bucket_instance.blob(filename) + + dest_parent = os.path.dirname(dest) + if dest_parent: + if os.path.exists(dest_parent) and not os.path.isdir(dest_parent): + raise NotADirectoryError(dest_parent) + os.makedirs(dest_parent, exist_ok=True) + await blocking_to_async( + self._thread_pool, + blob.download_to_filename, + dest, + single_shot_download=True, + timeout=self._timeout, + ) + + async def download_large_file(self, bucket: str, src: str, dest: str) -> None: + user_project = self._get_user_project_for_bucket(bucket) + bucket_instance = self._client.bucket(bucket, user_project=user_project) + blob = bucket_instance.blob(src) + + dest_parent = os.path.dirname(dest) + if dest_parent: + if os.path.exists(dest_parent) and not os.path.isdir(dest_parent): + raise NotADirectoryError(dest_parent) + os.makedirs(dest_parent, exist_ok=True) + await blocking_to_async( + self._thread_pool, + transfer_manager.download_chunks_concurrently, + blob=blob, + filename=dest, + chunk_size=self.CHUNK_SIZE, + download_kwargs={'timeout': self._timeout}, + max_workers=self.MAX_WORKERS, + ) + async def compose(self, bucket: str, names: List[str], destination: str, **kwargs) -> None: assert destination n = len(names) @@ -455,6 +557,18 @@ def _update_params_with_user_project(self, request_kwargs, bucket): if bucket in buckets: params.update({'userProject': project}) + def _get_user_project_for_bucket(self, bucket: str) -> Optional[str]: + if isinstance(self._gcs_requester_pays_configuration, str): + return self._gcs_requester_pays_configuration + elif isinstance(self._gcs_requester_pays_configuration, tuple): + project, buckets = self._gcs_requester_pays_configuration + if bucket in buckets: + return project + else: + return None + else: + return None + class GetObjectFileStatus(FileStatus): def __init__(self, items: Dict[str, str], url: str): @@ -646,6 +760,8 @@ def __init__( if bucket_allow_list is None: bucket_allow_list = [] self.allowed_storage_locations = bucket_allow_list + self._sema = WeightedSemaphore(self._storage_client.MAX_WORKERS * 10) + self._xfer_sema = WeightedSemaphore(self._storage_client.CHUNK_SIZE * 10) @staticmethod def schemes() -> Set[str]: @@ -875,6 +991,57 @@ async def remove(self, url: str) -> None: raise FileNotFoundError(url) from e raise + async def copy_between_fs( + self, + srcfile: str, + srcstat: FileStatus, + destfile: str, + **kwargs, + ): + if LocalAsyncFS.valid_url(destfile): + if kwargs.get('sema'): + sema = cast(WeightedSemaphore, kwargs.get('sema')) + else: + sema = self._sema + if kwargs.get('xfer_sema'): + xfer_sema = cast(WeightedSemaphore, kwargs.get('xfer_sema')) + else: + xfer_sema = self._xfer_sema + + size = await srcstat.size() + if destfile.startswith('file://'): + local_dest = destfile[len('file://') :] + else: + local_dest = destfile + if size > self._storage_client.CHUNK_SIZE: + async with sema.acquire_manager(self._storage_client.MAX_WORKERS): + await self._copy_single_large_file(xfer_sema, srcfile, local_dest) + else: + await self._copy_single_local_file(xfer_sema, srcfile, local_dest, size) + else: + raise NotImplementedError + + async def _copy_single_local_file( + self, + xfer_sema: WeightedSemaphore, + src: str, + dest: str, + size: int, + ) -> None: + async with xfer_sema.acquire_manager(size): + bucket, name = self.get_bucket_and_name(src) + await retry_transient_errors(self._storage_client.download_single_file, bucket, name, dest) + + async def _copy_single_large_file( + self, + xfer_sema: WeightedSemaphore, + src: str, + dest: str, + ) -> None: + async with xfer_sema.acquire_manager(self._storage_client.CHUNK_SIZE * self._storage_client.MAX_WORKERS): + bucket, name = self.get_bucket_and_name(src) + await retry_transient_errors(self._storage_client.download_large_file, bucket, name, dest) + async def copy_within_gcs( self, src: str, dest: str, callback: Optional[Callable[[Dict[str, Any], bool], None]] = None ) -> None: diff --git a/hail/python/hailtop/aiotools/copy.py b/hail/python/hailtop/aiotools/copy.py index 7520fa85a2b..b2e60751c7c 100644 --- a/hail/python/hailtop/aiotools/copy.py +++ b/hail/python/hailtop/aiotools/copy.py @@ -12,16 +12,16 @@ from .. import uvloopx from ..utils.rich_progress_bar import CopyToolProgressBar, make_listener from ..utils.utils import sleep_before_try -from . import Copier, Transfer +from . import Copier, Transfer, WeightedSemaphore from .router_fs import RouterAsyncFS -class GrowingSempahore(AsyncContextManager[asyncio.Semaphore]): +class GrowingSemaphore(AsyncContextManager[WeightedSemaphore]): def __init__(self, start_max: int, target_max: int, progress_and_tid: Optional[Tuple[Progress, TaskID]]): + self.sema = WeightedSemaphore(start_max) self.task: Optional[asyncio.Task] = None self.target_max = target_max self.current_max = start_max - self.sema = asyncio.Semaphore(self.current_max) self.progress_and_tid = progress_and_tid async def _grow(self): @@ -34,28 +34,23 @@ async def _grow(self): ) new_max = min(int(self.current_max * 1.5), self.target_max) diff = new_max - self.current_max - self.sema._value += diff - self.sema._wake_up_next() + self.sema.release(diff) self.current_max = new_max if self.progress_and_tid: progress, tid = self.progress_and_tid progress.update(tid, advance=diff) - async def __aenter__(self) -> asyncio.Semaphore: + async def __aenter__(self) -> WeightedSemaphore: self.task = asyncio.create_task(self._grow()) - await self.sema.__aenter__() return self.sema - async def __aexit__(self, exc_type, exc, tb): - try: - await self.sema.__aexit__(exc_type, exc, tb) - finally: - if self.task is not None: - if self.task.done() and not self.task.cancelled(): - if exc := self.task.exception(): - raise exc - else: - self.task.cancel() + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.task is not None: + if self.task.done() and not self.task.cancelled(): + if exc := self.task.exception(): + raise exc + else: + self.task.cancel() async def copy( @@ -76,6 +71,11 @@ async def copy( if 'thread_pool' not in local_kwargs: local_kwargs['thread_pool'] = thread_pool + if gcs_kwargs is None: + gcs_kwargs = {} + if 'thread_pool' not in gcs_kwargs: + gcs_kwargs['thread_pool'] = thread_pool + if s3_kwargs is None: s3_kwargs = {} if 'thread_pool' not in s3_kwargs: @@ -94,7 +94,7 @@ async def copy( total=max_simultaneous_transfers, visible=verbose, ) - async with GrowingSempahore( + async with GrowingSemaphore( initial_simultaneous_transfers, max_simultaneous_transfers, (progress, parallelism_tid) ) as sema: file_tid = progress.add_task(description='files', total=0, visible=verbose) diff --git a/hail/python/hailtop/aiotools/fs/copier.py b/hail/python/hailtop/aiotools/fs/copier.py index 187865acdf6..e92069c793a 100644 --- a/hail/python/hailtop/aiotools/fs/copier.py +++ b/hail/python/hailtop/aiotools/fs/copier.py @@ -264,7 +264,7 @@ async def _copy_part( async def _copy_file_multi_part_main( self, - sema: asyncio.Semaphore, + sema: WeightedSemaphore, source_report: SourceReport, srcfile: str, srcstat: FileStatus, @@ -308,7 +308,7 @@ async def f(i): async def _copy_file_multi_part( self, - sema: asyncio.Semaphore, + sema: WeightedSemaphore, source_report: SourceReport, srcfile: str, srcstat: FileStatus, @@ -317,7 +317,18 @@ async def _copy_file_multi_part( ) -> None: success = False try: - await self._copy_file_multi_part_main(sema, source_report, srcfile, srcstat, destfile, return_exceptions) + try: + await self.router_fs.copy_between_fs( + srcfile, + srcstat, + destfile, + sema=sema, + xfer_sema=self.xfer_sema, + ) + except NotImplementedError: + await self._copy_file_multi_part_main( + sema, source_report, srcfile, srcstat, destfile, return_exceptions + ) success = True except Exception as e: if return_exceptions: @@ -347,7 +358,7 @@ async def _full_dest(self): async def copy_as_file( self, - sema: asyncio.Semaphore, # pylint: disable=unused-argument + sema: WeightedSemaphore, # pylint: disable=unused-argument source_report: SourceReport, return_exceptions: bool, ): @@ -379,7 +390,7 @@ async def copy_as_file( source_report.start_bytes(await srcstat.size()) await self._copy_file_multi_part(sema, source_report, src, srcstat, full_dest, return_exceptions) - async def copy_as_dir(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool): + async def copy_as_dir(self, sema: WeightedSemaphore, source_report: SourceReport, return_exceptions: bool): async def files_iterator() -> AsyncIterator[FileListEntry]: return await self.router_fs.listfiles(src, recursive=True) @@ -450,7 +461,7 @@ async def create_copies() -> Tuple[List[Callable[[], Awaitable[None]]], int]: source_report.start_bytes(bytes_to_copy) await bounded_gather2(sema, *copies, cancel_on_error=True) - async def copy(self, sema: asyncio.Semaphore, source_report: SourceReport, return_exceptions: bool): + async def copy(self, sema: WeightedSemaphore, source_report: SourceReport, return_exceptions: bool): try: # gather with return_exceptions=True to make copy # deterministic with respect to exceptions @@ -496,7 +507,7 @@ class Copier: @staticmethod async def copy( fs: AsyncFS, - sema: asyncio.Semaphore, + sema: WeightedSemaphore, transfer: Union[Transfer, List[Transfer]], return_exceptions: bool = False, *, @@ -538,7 +549,7 @@ async def _dest_type(self, transfer: Transfer): async def copy_source( self, - sema: asyncio.Semaphore, + sema: WeightedSemaphore, transfer: Transfer, source_report: SourceReport, src: str, @@ -551,7 +562,7 @@ async def copy_source( await src_copier.copy(sema, source_report, return_exceptions) async def _copy_one_transfer( - self, sema: asyncio.Semaphore, transfer_report: TransferReport, transfer: Transfer, return_exceptions: bool + self, sema: WeightedSemaphore, transfer_report: TransferReport, transfer: Transfer, return_exceptions: bool ): try: if transfer.treat_dest_as == Transfer.INFER_DEST: @@ -593,7 +604,7 @@ async def _copy_one_transfer( async def _copy( self, - sema: asyncio.Semaphore, + sema: WeightedSemaphore, copy_report: CopyReport, transfer: Union[Transfer, List[Transfer]], return_exceptions: bool, diff --git a/hail/python/hailtop/aiotools/fs/fs.py b/hail/python/hailtop/aiotools/fs/fs.py index 6aa97dfe362..6ccc7198c8d 100644 --- a/hail/python/hailtop/aiotools/fs/fs.py +++ b/hail/python/hailtop/aiotools/fs/fs.py @@ -397,6 +397,15 @@ async def rm(entry: FileListEntry): if tasks: await pool.wait(tasks) + async def copy_between_fs( + self, + srcfile: str, + srcstat: FileStatus, + destfile: str, + **kwargs, + ): + raise NotImplementedError + async def touch(self, url: str) -> None: async with await self.create(url): pass diff --git a/hail/python/hailtop/aiotools/router_fs.py b/hail/python/hailtop/aiotools/router_fs.py index fecf48cf009..e2005a359cd 100644 --- a/hail/python/hailtop/aiotools/router_fs.py +++ b/hail/python/hailtop/aiotools/router_fs.py @@ -106,6 +106,16 @@ async def _get_fs(self, url: str): return self._s3_fs raise ValueError(f'no file system found for url {url}') + async def copy_between_fs( + self, + srcfile: str, + srcstat: FileStatus, + destfile: str, + **kwargs, + ): + fs = await self._get_fs(srcfile) + await fs.copy_between_fs(srcfile, srcstat, destfile, **kwargs) + async def open(self, url: str) -> ReadableStream: fs = await self._get_fs(url) return await fs.open(url) diff --git a/hail/python/hailtop/aiotools/weighted_semaphore.py b/hail/python/hailtop/aiotools/weighted_semaphore.py index 85b5ad89e99..381e11bd4b3 100644 --- a/hail/python/hailtop/aiotools/weighted_semaphore.py +++ b/hail/python/hailtop/aiotools/weighted_semaphore.py @@ -1,9 +1,11 @@ -from asyncio import Event +from asyncio import Event, Semaphore from types import TracebackType -from typing import Optional, Type, cast +from typing import Literal, Optional, Type, TypeVar, cast from sortedcontainers import SortedKeyList +T = TypeVar('T') # pylint: disable=invalid-name + class _AcquireManager: def __init__(self, ws: 'WeightedSemaphore', n: int): @@ -20,23 +22,30 @@ async def __aexit__( self._ws.release(self._n) -class WeightedSemaphore: +class WeightedSemaphore(Semaphore): def __init__(self, value: int): + super().__init__(value) self.max = value - self.value = value + self._value = value self.events = SortedKeyList(key=lambda x: x[0]) - def release(self, n: int) -> None: - self.value += n + def locked(self): + return self._value == 0 and (any(not event.is_set() for n, event in (self.events or ()))) + + def release(self, n: int = 1) -> None: + self._value += n + self._wake_up_next() + + def _wake_up_next(self): while self.events: _n, _event = self.events[0] # cast to int / Event: _n = cast(int, _n) _event = cast(Event, _event) - if self.value >= _n: + if self._value >= _n: self.events.pop(0) - self.value -= _n + self._value -= _n _event.set() else: break @@ -44,12 +53,13 @@ def release(self, n: int) -> None: def acquire_manager(self, n: int) -> _AcquireManager: return _AcquireManager(self, n) - async def acquire(self, n: int) -> None: + async def acquire(self, n: int = 1) -> Literal[True]: assert n <= self.max - if self.value >= n: - self.value -= n - return + if self._value >= n: + self._value -= n + return True event = Event() self.events.add((n, event)) await event.wait() + return True diff --git a/hail/python/hailtop/fs/router_fs.py b/hail/python/hailtop/fs/router_fs.py index cbc1b02e1a1..ee603d162c1 100644 --- a/hail/python/hailtop/fs/router_fs.py +++ b/hail/python/hailtop/fs/router_fs.py @@ -20,6 +20,7 @@ from hailtop.aiotools.router_fs import RouterAsyncFS from hailtop.utils import async_to_blocking, bounded_gather2 +from ..aiotools import WeightedSemaphore from .fs import FS from .stat_result import FileListEntry, FileType @@ -238,7 +239,7 @@ def copy(self, src: str, dest: str, *, max_simultaneous_transfers=75): transfer = Transfer(src, dest) async def _copy(): - sema = asyncio.Semaphore(max_simultaneous_transfers) + sema = WeightedSemaphore(max_simultaneous_transfers) await Copier.copy(self.afs, sema, transfer) return async_to_blocking(_copy()) diff --git a/hail/python/hailtop/pinned-requirements.txt b/hail/python/hailtop/pinned-requirements.txt index c980835b9e2..1a8b6671215 100644 --- a/hail/python/hailtop/pinned-requirements.txt +++ b/hail/python/hailtop/pinned-requirements.txt @@ -30,9 +30,9 @@ azure-mgmt-storage==20.1.0 # via -r hail/python/hailtop/requirements.txt azure-storage-blob==12.28.0 # via -r hail/python/hailtop/requirements.txt -boto3==1.42.87 +boto3==1.42.92 # via -r hail/python/hailtop/requirements.txt -botocore==1.42.87 +botocore==1.42.92 # via # -r hail/python/hailtop/requirements.txt # boto3 @@ -65,15 +65,34 @@ frozenlist==1.8.0 # -r hail/python/hailtop/requirements.txt # aiohttp # aiosignal +google-api-core==2.30.3 + # via + # google-cloud-core + # google-cloud-storage google-auth==2.49.2 # via # -r hail/python/hailtop/requirements.txt + # google-api-core # google-auth-oauthlib + # google-cloud-core + # google-cloud-storage google-auth-oauthlib==0.8.0 # via -r hail/python/hailtop/requirements.txt +google-cloud-core==2.5.1 + # via google-cloud-storage +google-cloud-storage==3.10.1 + # via -r hail/python/hailtop/requirements.txt +google-crc32c==1.8.0 + # via + # google-cloud-storage + # google-resumable-media +google-resumable-media==2.8.2 + # via google-cloud-storage +googleapis-common-protos==1.74.0 + # via google-api-core humanize==4.15.0 # via -r hail/python/hailtop/requirements.txt -idna==3.11 +idna==3.12 # via # requests # yarl @@ -111,6 +130,13 @@ propcache==0.4.1 # via # aiohttp # yarl +proto-plus==1.27.2 + # via google-api-core +protobuf==7.34.1 + # via + # google-api-core + # googleapis-common-protos + # proto-plus pyasn1==0.6.3 # via pyasn1-modules pyasn1-modules==0.4.2 @@ -132,6 +158,8 @@ pyyaml==6.0.3 requests==2.33.1 # via # azure-core + # google-api-core + # google-cloud-storage # msal # msrest # requests-oauthlib diff --git a/hail/python/hailtop/requirements.txt b/hail/python/hailtop/requirements.txt index 87da8980f42..a9a3b0023ed 100644 --- a/hail/python/hailtop/requirements.txt +++ b/hail/python/hailtop/requirements.txt @@ -9,6 +9,7 @@ dill>=0.4.0,<0.5 frozenlist>=1.3.1,<2 google-auth>=2.14.1,<3 google-auth-oauthlib>=0.5.2,<1 +google-cloud-storage>=3.8.0 humanize>=4.0,<5 janus>=0.6,<1.1 nest_asyncio>=1.5.8,<2 diff --git a/hail/python/pinned-requirements.txt b/hail/python/pinned-requirements.txt index 1ceea3cb562..7ae57965ef2 100644 --- a/hail/python/pinned-requirements.txt +++ b/hail/python/pinned-requirements.txt @@ -59,11 +59,11 @@ azure-storage-blob==12.28.0 # -r hail/python/hailtop/requirements.txt bokeh==3.9.0 # via -r hail/python/requirements.txt -boto3==1.42.87 +boto3==1.42.92 # via # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/hailtop/requirements.txt -botocore==1.42.87 +botocore==1.42.92 # via # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/hailtop/requirements.txt @@ -115,20 +115,49 @@ frozenlist==1.8.0 # -r hail/python/hailtop/requirements.txt # aiohttp # aiosignal +google-api-core==2.30.3 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-cloud-core + # google-cloud-storage google-auth==2.49.2 # via # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/hailtop/requirements.txt + # google-api-core # google-auth-oauthlib + # google-cloud-core + # google-cloud-storage google-auth-oauthlib==0.8.0 # via # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/hailtop/requirements.txt +google-cloud-core==2.5.1 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-cloud-storage +google-cloud-storage==3.10.1 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # -r hail/python/hailtop/requirements.txt +google-crc32c==1.8.0 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-cloud-storage + # google-resumable-media +google-resumable-media==2.8.2 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-cloud-storage +googleapis-common-protos==1.74.0 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-api-core humanize==4.15.0 # via # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/hailtop/requirements.txt -idna==3.11 +idna==3.12 # via # -c hail/python/hailtop/pinned-requirements.txt # requests @@ -173,7 +202,7 @@ multidict==6.7.1 # -c hail/python/hailtop/pinned-requirements.txt # aiohttp # yarl -narwhals==2.19.0 +narwhals==2.20.0 # via bokeh nest-asyncio==1.6.0 # via @@ -194,7 +223,7 @@ orjson==3.11.8 # via # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/hailtop/requirements.txt -packaging==26.0 +packaging==26.1 # via # bokeh # plotly @@ -211,6 +240,16 @@ propcache==0.4.1 # -c hail/python/hailtop/pinned-requirements.txt # aiohttp # yarl +proto-plus==1.27.2 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-api-core +protobuf==7.34.1 + # via + # -c hail/python/hailtop/pinned-requirements.txt + # google-api-core + # googleapis-common-protos + # proto-plus py4j==0.10.9.7 # via pyspark pyasn1==0.6.3 @@ -264,6 +303,8 @@ requests==2.33.1 # -c hail/python/hailtop/pinned-requirements.txt # -r hail/python/requirements.txt # azure-core + # google-api-core + # google-cloud-storage # msal # msrest # requests-oauthlib diff --git a/hail/python/test/hailtop/inter_cloud/conftest.py b/hail/python/test/hailtop/inter_cloud/conftest.py index 0336614c875..c4ae1a2cb1c 100644 --- a/hail/python/test/hailtop/inter_cloud/conftest.py +++ b/hail/python/test/hailtop/inter_cloud/conftest.py @@ -1,4 +1,3 @@ -import asyncio import functools import os import secrets @@ -6,12 +5,13 @@ import pytest +from hailtop.aiotools import WeightedSemaphore from hailtop.aiotools.router_fs import AsyncFS, RouterAsyncFS from hailtop.utils import bounded_gather2 @pytest.fixture(scope='module') -async def router_filesystem() -> AsyncIterator[Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]]]: +async def router_filesystem() -> AsyncIterator[Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]]]: token = secrets.token_hex(16) async with RouterAsyncFS() as fs: @@ -26,7 +26,7 @@ async def router_filesystem() -> AsyncIterator[Tuple[asyncio.Semaphore, AsyncFS, bases = {'file': file_base, 'gs': gs_base, 's3': s3_base} - sema = asyncio.Semaphore(50) + sema = WeightedSemaphore(50) async with sema: yield (sema, fs, bases) await bounded_gather2( diff --git a/hail/python/test/hailtop/inter_cloud/generate_copy_test_specs.py b/hail/python/test/hailtop/inter_cloud/generate_copy_test_specs.py index f9d11fe5d1f..22591cbd9ed 100644 --- a/hail/python/test/hailtop/inter_cloud/generate_copy_test_specs.py +++ b/hail/python/test/hailtop/inter_cloud/generate_copy_test_specs.py @@ -3,7 +3,7 @@ import secrets from concurrent.futures import ThreadPoolExecutor -from hailtop.aiotools import Copier, Transfer +from hailtop.aiotools import Copier, Transfer, WeightedSemaphore from hailtop.aiotools.router_fs import RouterAsyncFS @@ -137,7 +137,7 @@ async def copy_test_specs(): async with await fs.create(f'{dest_base}keep'): pass - sema = asyncio.Semaphore(50) + sema = WeightedSemaphore(50) async with sema: result = await run_test_spec(sema, fs, config, src_base, dest_base) config['result'] = result diff --git a/hail/python/test/hailtop/inter_cloud/test_copy.py b/hail/python/test/hailtop/inter_cloud/test_copy.py index 05d56add6cc..c3acc8e6664 100644 --- a/hail/python/test/hailtop/inter_cloud/test_copy.py +++ b/hail/python/test/hailtop/inter_cloud/test_copy.py @@ -1,10 +1,9 @@ -import asyncio import secrets from typing import AsyncIterator, Dict, List, Tuple import pytest -from hailtop.aiotools import AsyncFS, Copier, FileAndDirectoryError, FileListEntry, Transfer +from hailtop.aiotools import AsyncFS, Copier, FileAndDirectoryError, FileListEntry, Transfer, WeightedSemaphore from hailtop.utils import url_scheme from .copy_test_specs import COPY_TEST_SPECS @@ -42,7 +41,7 @@ async def cloud_scheme(request): 's3/s3', ] ) -async def copy_test_context(request, router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]]): +async def copy_test_context(request, router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]]): sema, fs, bases = router_filesystem [src_scheme, dest_scheme] = request.param.split('/') @@ -355,7 +354,7 @@ async def test_file_overwrite_dir(copy_test_context): async def test_file_and_directory_error( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], cloud_scheme: str ): sema, fs, bases = router_filesystem @@ -394,7 +393,7 @@ async def collect_files(it: AsyncIterator[FileListEntry]) -> List[str]: async def test_file_and_directory_error_with_slash_empty_file( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], cloud_scheme: str ): sema, fs, bases = router_filesystem @@ -429,7 +428,7 @@ async def test_file_and_directory_error_with_slash_empty_file( async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non_recursive( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], ): _, fs, bases = router_filesystem @@ -446,7 +445,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_for_google_non async def test_file_and_directory_error_with_slash_non_empty_file( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], cloud_scheme: str ): sema, fs, bases = router_filesystem @@ -489,7 +488,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file( async def test_file_and_directory_error_with_slash_non_empty_file_only_for_google_non_recursive( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], ): sema, fs, bases = router_filesystem @@ -512,7 +511,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_for_googl async def test_file_and_directory_error_with_slash_empty_file_only( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], cloud_scheme: str ): sema, fs, bases = router_filesystem @@ -537,7 +536,7 @@ async def test_file_and_directory_error_with_slash_empty_file_only( async def test_file_and_directory_error_with_slash_non_empty_file_only_google_non_recursive( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], ): _, fs, bases = router_filesystem @@ -553,7 +552,7 @@ async def test_file_and_directory_error_with_slash_non_empty_file_only_google_no async def test_file_and_directory_error_with_slash_non_empty_file_only( - router_filesystem: Tuple[asyncio.Semaphore, AsyncFS, Dict[str, str]], cloud_scheme: str + router_filesystem: Tuple[WeightedSemaphore, AsyncFS, Dict[str, str]], cloud_scheme: str ): sema, fs, bases = router_filesystem diff --git a/web_common/pinned-requirements.txt b/web_common/pinned-requirements.txt index c0053f90721..aad16b9c33a 100644 --- a/web_common/pinned-requirements.txt +++ b/web_common/pinned-requirements.txt @@ -39,7 +39,7 @@ frozenlist==1.8.0 # -c web_common/../hail/python/pinned-requirements.txt # aiohttp # aiosignal -idna==3.11 +idna==3.12 # via # -c web_common/../gear/pinned-requirements.txt # -c web_common/../hail/python/dev/pinned-requirements.txt