[batch] Use gcloud to localize large input files from GCS#15350
[batch] Use gcloud to localize large input files from GCS#15350kush-chandra wants to merge 10 commits into
Conversation
fbc83d5 to
c514da6
Compare
| timeout=self._timeout, | ||
| ) | ||
| except FileNotFoundError: | ||
| os.makedirs(os.path.dirname(local_dest), exist_ok=True) |
There was a problem hiding this comment.
Can we check ahead of time that the directory exists (instead of letting it throw an exception that we then have to catch do resubmit the exact same command in the except block?)
There was a problem hiding this comment.
Yep, I've refactored this. The original copier code used that pattern to rely on an implementation detail of local_fs to catch an edge case where we're trying to expand a directory into an existing file which is not a directory, so I made that check explicit here.
| max_workers=8, | ||
| ) | ||
| except FileNotFoundError: | ||
| os.makedirs(os.path.dirname(local_dest), exist_ok=True) |
| success = False | ||
| threads_acquired = 0 | ||
| try: | ||
| for _ in range(0, 8): |
There was a problem hiding this comment.
There's two risky things here:
- We already acquired a sema slot from
bounded_gather2. Especially with thexfer_semawaits introducing an easy place for tasks to be paused/suspended, it's very possible that lots of tasks get a bounded_gather2 sema slots and so the sema pool is empty before we try to acquire another 8 slots for the download here - Because this acquiring logic is not atomic, it's possible that a thread might pick up a sub-portion of its 8 required slots here, then get suspended, and those reserved slots are locked up in this task which isn't doing anything with them yet. It looks like the weighted semaphor semantics is maybe more what you're looking for here (you can atomically request a number of slots)?
There was a problem hiding this comment.
I changed the copier semaphore to a weighted semaphore so this can atomically request 8 threads from the shared pool. xfer_sema appears to allow more simultaneous transfers than sema does, so I think this mitigates the deadlock risk. (The original copier has the same thresholds for sema & xfer_sema, so I'm wondering how much value xfer_sema is actually providing like this.)
| ) | ||
| credentials = kwargs.get('credentials') | ||
| if isinstance(credentials, GoogleCredentials): | ||
| access_token = credentials.access_token |
There was a problem hiding this comment.
I believe these access tokens have a 1 hour lifespan before they'll be rejected. We can split on the type of google credentials we have and use one of Credentials.from_service_account_info(credentials.key) for service accounts or Credentials(token=None, refresh_token=credentials.credentials['refresh_token'], client_id=credentials.credentials['client_id'], client_secret=credentials.credentials['client_secret'], token_uri='https://oauth2.googleapis.com/token') for application-default credentials, or Client(credentials = None) for automatic application-default credentials.
In fact, credentials.access_token looks like a method (and an async method at that), so this would return a method to produce a coroutine rather than a value, so I'm not sure this code path would work even temporarily. I think we must be going the credentials=None / application-default route below during the on-VM testing, but I'm not sure it'd work if we tried to pass credentials in on a local laptop
There was a problem hiding this comment.
Thanks for catching that. I split it up into service account/token creds/anonymous creds/application default creds. From what I could tell, VM testing is indeed using the default path.
8a18981 to
c1b9bbf
Compare
1cc7e2d to
289df0d
Compare
289df0d to
eb7b283
Compare
| if not os.path.exists(os.path.dirname(dest)): | ||
| os.makedirs(os.path.dirname(dest), exist_ok=True) |
There was a problem hiding this comment.
Blocking I/O operations (os.path.exists and os.makedirs) are called in an async method without being wrapped in blocking_to_async. This blocks the event loop.
if not os.path.exists(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest), exist_ok=True)
# Should be:
await blocking_to_async(self._thread_pool, os.makedirs, os.path.dirname(dest), exist_ok=True)Also, the os.path.exists check is redundant since exist_ok=True handles existing directories.
| if not os.path.exists(os.path.dirname(dest)): | |
| os.makedirs(os.path.dirname(dest), exist_ok=True) | |
| await blocking_to_async(self._thread_pool, os.makedirs, os.path.dirname(dest), exist_ok=True) |
Spotted by Graphite
Is this helpful? React 👍 or 👎 to let us know.
ead1b66 to
442912c
Compare
d5ee35f to
b9549ca
Compare
a48fe0e to
74a0e0a
Compare
74a0e0a to
699d16c
Compare
|
If #15395 merges, the test batch for this PR would only have run the following steps: |
699d16c to
f7db91a
Compare
…15273) ## Change Description Fixes hail-is#15011 Google vends a tool for parallelized file downloads in their Python client library: https://docs.cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.transfer_manager.html Using this instead of our copier code to download from GCS buckets to local storage lets us download larger files which our copier code cannot handle without frequent timeouts & failures. I compared the performance when a job downloads an input file using the copier vs using the Google library: | File size | Copier code | transfer_manager | |--------|--------|--------| | 500B | 1s | 0.8s | | 5M | 0.6s | 0.8s | | 1G | 50s | 2.7s | | 5G | 100s | 13.8s | | 10G | 219s | 27s | | 21G | failed | 54s | | 54G | failed | 135s | Performance is similar for smaller files with variance mostly due to network bandwidth. At 1G, the copier starts to hit transient errors on some chunks & retries. Beyond 20G, jobs using the copier failed in the input stage after an indeterminate length of time. ## Security Assessment Delete all except the correct answer: - This change potentially impacts the Hail Batch instance as deployed by Broad Institute in GCP ### Impact Rating Delete all except the correct answer: - This change has a medium security impact ### Impact Description Replace this content with a description of the impact of the change: This change instantiates a new Google client to download files from GCS buckets. It uses the same local credential files as the existing custom clients we've created. ### Appsec Review - [x] Required: The impact has been assessed and approved by appsec --------- Co-authored-by: Chris Llanwarne <cjllanwarne@users.noreply.github.com> # Conflicts: # gear/pinned-requirements.txt # hail/python/hailtop/pinned-requirements.txt # hail/python/pinned-requirements.txt # Conflicts: # gear/pinned-requirements.txt # hail/python/hailtop/pinned-requirements.txt # hail/python/pinned-requirements.txt
f7db91a to
5c61612
Compare
5c61612 to
fae031c
Compare
cjllanwarne
left a comment
There was a problem hiding this comment.
I have a few thoughts -
- I don't fully understand what's happening here. Would you be willing to give a walkthrough?
- I'm a little scared given how many big - and pretty foundational - changes are happening to such an intricate (brittle!) system
- One of my original big hopes was to reduce the maintenance overhead of our in-house copier having to re-implement
gcloud storage cpin python code. But we are doubling down on that in this PR
Before we commit to this route... I'd really like to see what a "shell out to gcloud storage cp" version would look like. Even if we have to make a couple of calls because it doesn't support many <=> many directory transfers, that still feels like it would be better than continuing to follow this "re-implement it ourselves" codebase. What do you think?
Change Description
Fixes #15011
This is a redo of #15273 which addresses the scalability issues with that approach:
The timing improvements for large files remain the same as in the original change, and there is now no spike in latency when downloading hundreds of smaller files.
Security Assessment
Impact Rating
Impact Description
This change instantiates a new Google client to download files from GCS buckets. It uses the same local credential files as the existing custom clients we've created. I manually verified the problem cases with the original change now work as intended and am working on adding additional testing for those paths.
Appsec Review