Skip to content
Open
24 changes: 14 additions & 10 deletions ci/ci/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import defaultdict
from contextlib import AsyncExitStack
from datetime import timezone
from typing import Any, Callable, Dict, List, NoReturn, Optional, Set, Tuple, TypedDict
from typing import Any, Callable, Dict, List, NoReturn, Optional, Set, Tuple, TypedDict, Union

import aiohttp_session # type: ignore
import gidgethub
Expand Down Expand Up @@ -598,7 +598,7 @@ async def batch_callback_handler(request: web.Request):
async def deploy_status(request: web.Request, _) -> web.Response:
batch_client = request.app[AppKeys.BATCH_CLIENT]

async def get_failure_information(batch):
async def get_failure_information(batch: Union[Batch, MergeFailureBatch]) -> Any:
if isinstance(batch, MergeFailureBatch):
exc = batch.exception
return traceback.format_exception(type(exc), value=exc, tb=exc.__traceback__)
Expand All @@ -612,19 +612,22 @@ async def fetch_job_and_log(j):

return await asyncio.gather(*[fetch_job_and_log(j) for j in jobs if j['state'] in ('Error', 'Failed')])

wb_configs = [
{
async def wb_config(wb: WatchedBranch) -> Dict[str, Any]:
if wb.deploy_state in ('failure', 'checkout_failure'):
assert wb.deploy_batch is not None
failure_information = await get_failure_information(wb.deploy_batch)
else:
failure_information = None
return {
'branch': wb.branch.short_str(),
'sha': wb.sha,
'deploy_batch_id': wb.deploy_batch.id if wb.deploy_batch and isinstance(wb.deploy_batch, Batch) else None,
'deploy_state': wb.deploy_state,
'repo': wb.branch.repo.short_str(),
'failure_information': None
if wb.deploy_state == 'success'
else await get_failure_information(wb.deploy_batch),
'failure_information': failure_information,
}
for wb in watched_branches
]

wb_configs = [await wb_config(wb) for wb in watched_branches]
return json_response(wb_configs)


Expand Down Expand Up @@ -1045,7 +1048,8 @@ async def on_startup(app: web.Application):
exit_stack = AsyncExitStack()
app[AppKeys.EXIT_STACK] = exit_stack

client_session = httpx.client_session()
# 60s timeout: bulk GitHub GraphQL PR status fetches can take 10-30s.
client_session = httpx.client_session(timeout=60)
exit_stack.push_async_callback(client_session.close)

app[AppKeys.CLIENT_SESSION] = client_session
Expand Down
209 changes: 137 additions & 72 deletions ci/ci/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import os
import random
import secrets
import time
from shlex import quote as shq
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Union
from typing import Any, Dict, Iterable, List, Optional, Protocol, Sequence, Set, Union

import aiohttp
import aiohttp.client_exceptions
Expand Down Expand Up @@ -41,6 +42,10 @@
zulip_client = zulip.Client(config_file="/zulip-config/.zuliprc")

TRACKED_PRS = pc.Gauge('ci_tracked_prs', 'PRs currently being monitored by CI', ['build_state', 'review_state'])
WATCHED_BRANCH_UPDATE_LATENCY = pc.Gauge(
'ci_watched_branch_update_latency_seconds',
'Duration of the most recent WatchedBranch update cycle',
)

MAX_CONCURRENT_PR_BATCHES = 3

Expand Down Expand Up @@ -270,8 +275,8 @@
self.developers = developers

def set_build_state(self, build_state):
log.info(f'{self.short_str()}: Build state changing from {self.build_state} => {build_state}')
if build_state != self.build_state:
log.info(f'{self.short_str()}: build state changing: {self.build_state} => {build_state}')
self.decrement_pr_metric()
self.build_state = build_state
self.increment_pr_metric()
Expand Down Expand Up @@ -431,7 +436,10 @@
assignees.add(select_random_teammate(SERVICES_TEAM).gh_username)
if ASSIGN_COMPILER in self.body:
assignees.add(select_random_teammate(COMPILER_TEAM).gh_username)
if not assignees:
return
data = {'assignees': list(assignees)}
log.info(f'{self.short_str()}: assigning reviewers: {data}')
try:
await gh_client.post(
f'/repos/{self.target_branch.branch.repo.short_str()}/issues/{self.number}/assignees', data=data
Expand All @@ -441,72 +449,7 @@
except aiohttp.client_exceptions.ClientResponseError:
log.exception(f'{self.short_str()}: Unexpected exception in post to github: {data}')

async def _update_github(self, gh):
results = []
cursor = None
review_decision = None

def query():
return f"""
query {{
repository (
owner: "{self.target_branch.branch.repo.owner}",
name: "{self.target_branch.branch.repo.name}"
) {{
pullRequest (number: {self.number}) {{
reviewDecision
commits (last: 1) {{
nodes {{
commit {{
statusCheckRollup {{
contexts (first: 10{f', after: "{cursor}"' if cursor is not None else ''}) {{
nodes {{
__typename
... on CheckRun {{
name
conclusion
isRequired (pullRequestNumber: {self.number})
}}
... on StatusContext {{
context
state
isRequired (pullRequestNumber: {self.number})
}}
}}
pageInfo {{
endCursor
hasNextPage
}}
}}
}}
}}
}}
}}
}}
}}
}}
"""

def review_decision_and_commit_status(pull_request, rollup):
nonlocal review_decision
if review_decision is None:
review_decision = (
pull_request["reviewDecision"] if pull_request["reviewDecision"] is not None else "API_NONE"
)
if rollup is not None:
results.extend(rollup["contexts"]["nodes"])

while (
rollup := (
pull_request := (await gh.post("/graphql", data={"query": query()}))["data"]["repository"][
"pullRequest"
]
)["commits"]["nodes"][0]["commit"]["statusCheckRollup"]
) is not None and rollup["contexts"]["pageInfo"]["hasNextPage"]:
cursor = rollup["contexts"]["pageInfo"]["endCursor"]
review_decision_and_commit_status(pull_request, rollup)
review_decision_and_commit_status(pull_request, rollup)

def _apply_github_data(self, review_decision: str, check_nodes: List[Dict[str, Any]]):

Check warning on line 452 in ci/ci/github.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

ci/ci/github.py#L452

Method _apply_github_data has a cyclomatic complexity of 11 (limit is 8)
if review_decision == 'APPROVED':
review_state = 'approved'
elif review_decision == 'CHANGES_REQUESTED':
Expand All @@ -528,7 +471,7 @@
self.target_branch.state_changed = True

last_known_github_status = {}
for check in results:
for check in check_nodes:
if check["isRequired"]:
if (typename := check["__typename"]) == "StatusContext":
last_known_github_status[check["context"]] = github_status(check["state"])
Expand Down Expand Up @@ -745,6 +688,100 @@
"""


class _GitHubGraphQL(Protocol):
async def post(self, url: str, *, data: Any) -> Any: ...


_GITHUB_GRAPHQL_PR_CHUNK_SIZE = 100


async def _fetch_pr_github_data(

Check warning on line 698 in ci/ci/github.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

ci/ci/github.py#L698

Method _fetch_pr_github_data has a cyclomatic complexity of 15 (limit is 8)
gh: _GitHubGraphQL,
owner: str,
repo_name: str,
pr_numbers: List[int],
) -> Dict[int, tuple]:
"""Fetch review decisions and status check rollup for all PRs, batched in chunks.

Uses GraphQL aliases (pr_N: pullRequest(number: N) {{ ... }}) to reduce round-trips.
Chunks requests to avoid GitHub query complexity timeouts (~2s per 20-PR chunk vs ~6s for 56).
Paginates if any PR has more than 20 check contexts, which is not expected in practice.

Returns dict mapping pr_number -> (review_decision: str, check_nodes: List[dict]).
review_decision is the raw GraphQL PullRequestReviewDecision value, or "API_NONE" when null.
"""
accumulated_nodes: Dict[int, List[Dict[str, Any]]] = {n: [] for n in pr_numbers}
review_decisions: Dict[int, Optional[str]] = {n: None for n in pr_numbers}

chunks = [
pr_numbers[i : i + _GITHUB_GRAPHQL_PR_CHUNK_SIZE]
for i in range(0, len(pr_numbers), _GITHUB_GRAPHQL_PR_CHUNK_SIZE)
]
for chunk in chunks:
# remaining maps pr_number -> cursor (None = first page)
remaining: Dict[int, Optional[str]] = {n: None for n in chunk}

while remaining:

def build_query(remaining: Dict[int, Optional[str]]) -> str:
aliases = []
for pr_number, cursor in remaining.items():
cursor_arg = f', after: "{cursor}"' if cursor is not None else ''
aliases.append(f"""
pr_{pr_number}: pullRequest(number: {pr_number}) {{
reviewDecision
commits(last: 1) {{
nodes {{
commit {{
statusCheckRollup {{
contexts(first: 20{cursor_arg}) {{
nodes {{
__typename
... on CheckRun {{
name
conclusion
isRequired(pullRequestNumber: {pr_number})
}}
... on StatusContext {{
context
state
isRequired(pullRequestNumber: {pr_number})
}}
}}
pageInfo {{
endCursor
hasNextPage
}}
}}
}}
}}
}}
}}
}}
""")
return f'query {{ repository(owner: "{owner}", name: "{repo_name}") {{ {"".join(aliases)} }} }}'

repo_data = (await gh.post("/graphql", data={"query": build_query(remaining)}))["data"]["repository"]

next_remaining: Dict[int, Optional[str]] = {}
for pr_number in list(remaining.keys()):
pr_data = repo_data[f"pr_{pr_number}"]
if review_decisions[pr_number] is None:
raw = pr_data["reviewDecision"]
review_decisions[pr_number] = raw if raw is not None else "API_NONE"
rollup = pr_data["commits"]["nodes"][0]["commit"]["statusCheckRollup"]
if rollup is not None:
accumulated_nodes[pr_number].extend(rollup["contexts"]["nodes"])
if rollup["contexts"]["pageInfo"]["hasNextPage"]:
next_remaining[pr_number] = rollup["contexts"]["pageInfo"]["endCursor"]

if next_remaining:
log.warning(f'_fetch_pr_github_data: pagination required for PRs {list(next_remaining.keys())}')
remaining = next_remaining

return {n: (review_decisions[n] or "API_NONE", accumulated_nodes[n]) for n in pr_numbers}


class WatchedBranch(Code):
def __init__(
self,
Expand Down Expand Up @@ -828,6 +865,7 @@
log.info(f'already updating {self.short_str()}')
return

t_update_start = time.monotonic()
try:
log.info(f'start update {self.short_str()}')
self.updating = True
Expand All @@ -847,7 +885,9 @@
if (self.deploy_batch is None or self.deploy_state is not None) and not frozen and self.mergeable:
await self.try_to_merge(gh)
finally:
log.info(f'update done {self.short_str()}')
t_total = time.monotonic() - t_update_start
WATCHED_BRANCH_UPDATE_LATENCY.set(t_total)
log.info(f'update done {self.short_str()} in {t_total:.1f}s')
self.updating = False

async def try_to_merge(self, gh):
Expand Down Expand Up @@ -890,8 +930,33 @@
for pr in new_prs.values():
await pr.assign_gh_reviewer_if_requested(gh)

for pr in new_prs.values():
await pr._update_github(gh)
if new_prs:
pr_github_data = await _fetch_pr_github_data(
gh, self.branch.repo.owner, self.branch.repo.name, list(new_prs.keys())
)

summary = []
for pr_number, (review_decision, check_nodes) in pr_github_data.items():
required_nodes = [c for c in check_nodes if c["isRequired"]]
required_statuses = [
github_status(c["conclusion"] if c["__typename"] == "CheckRun" else c["state"])
for c in required_nodes
]
Comment thread
cjllanwarne marked this conversation as resolved.
if any(s == GithubStatus.FAILURE for s in required_statuses):
check_decision = 'failing'
elif required_statuses and all(s == GithubStatus.SUCCESS for s in required_statuses):
check_decision = 'passing'
else:
check_decision = 'pending'
summary.append((pr_number, review_decision, len(check_nodes), len(required_nodes), check_decision))
log.info(
f'update github {self.short_str()}: PR data fetched '
f'(pr_number, review_decision, total_checks, required_checks, check_decision): {summary}'
)

for pr in new_prs.values():
review_decision, check_nodes = pr_github_data[pr.number]
pr._apply_github_data(review_decision, check_nodes)

async def _update_deploy(self, batch_client, db: Database):
assert self.deployable
Expand Down
4 changes: 3 additions & 1 deletion ci/ci/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ class GithubStatus(Enum):
FAILURE = 'failure'


def github_status(state: str) -> GithubStatus:
def github_status(state: Optional[str]) -> GithubStatus:
"""
Converts a state for a commit status (https://docs.github.com/en/graphql/reference/enums#statusstate)
or a conclusion for a check (https://docs.github.com/en/graphql/reference/enums#checkconclusionstate)
from the GraphQL API to a GithubStatus.
"""
if state is None:
return GithubStatus.PENDING
if state in {"PENDING", "EXPECTED", "ACTION_REQUIRED", "STALE"}:
return GithubStatus.PENDING
if state in {"FAILURE", "ERROR", "TIMED_OUT", "CANCELLED", "STARTUP_FAILURE", "SKIPPED"}:
Expand Down
40 changes: 40 additions & 0 deletions ci/unit-test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Configure the unit-test environment before any ci.* modules are imported.

In CI these values come from the real deployment environment (env vars + /global-config volume
mount). Locally they don't exist, so we set safe defaults and mock the config reader.
"""

import atexit
import json
import os
import unittest.mock

# Env vars read at module-import time by gear.profiling and ci.environment.
# setdefault leaves real CI values in place.
os.environ.setdefault('HAIL_SHA', 'test-sha')
os.environ.setdefault('HAIL_DEFAULT_NAMESPACE', 'default')
os.environ.setdefault('CLOUD', 'gcp')
os.environ.setdefault('HAIL_CI_UTILS_IMAGE', 'gcr.io/hail-vdc/ci-utils:test')
os.environ.setdefault('HAIL_BUILDKIT_IMAGE', 'gcr.io/hail-vdc/buildkit:test')
os.environ.setdefault('HAIL_CI_STORAGE_URI', 'gs://hail-ci-test/build')
os.environ.setdefault('HAIL_CI_GITHUB_CONTEXT', 'ci-test')

if not os.path.exists('/global-config'):
# Patch gear.cloud_config.read_config_secret so that ci.environment's module-level
# get_global_config() call (and the subsequent get_gcp_config() call) succeed locally.
_fake_global_config = {
'cloud': 'gcp',
'docker_prefix': 'gcr.io/hail-vdc',
'docker_root_image': 'ubuntu:22.04',
'domain': 'hail.is',
'kubernetes_server_url': 'https://k8s.example.com',
'default_namespace': 'default',
# Fields required by GCPConfig.from_global_config
'batch_gcp_regions': json.dumps(['us-central1']),
'gcp_region': 'us-central1',
'gcp_project': 'hail-vdc',
'gcp_zone': 'us-central1-a',
}
_patcher = unittest.mock.patch('gear.cloud_config.read_config_secret', return_value=_fake_global_config)
_patcher.start()
atexit.register(_patcher.stop)
2 changes: 2 additions & 0 deletions ci/unit-test/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto
Loading
Loading