Coverage for src / documint_mcp / github.py: 0%
206 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
1"""GitHub App metadata, repository sync, and PR automation for Documint V1."""
3from __future__ import annotations
5import base64
6import hashlib
7import hmac
8from dataclasses import dataclass
9from datetime import UTC, datetime, timedelta
10from typing import Any
11from urllib.parse import quote
13import httpx
14import jwt
15from fastapi import HTTPException
17from .config import settings
18from .models import DriftJobRequest, SourceSignalType
20SUPPORTED_GITHUB_EVENTS = (
21 "push",
22 "pull_request",
23 "release",
24 "installation",
25 "installation_repositories",
26)
27GITHUB_APP_PERMISSIONS = {
28 "contents": "write",
29 "metadata": "read",
30 "pull_requests": "write",
31}
32GITHUB_PR_ACTIONS = {"opened", "reopened", "ready_for_review", "synchronize"}
33GITHUB_RELEASE_ACTIONS = {"edited", "prereleased", "published", "released"}
34GITHUB_INSTALLATION_ACTIONS = {"created", "new_permissions_accepted"}
35GITHUB_INSTALLATION_REPO_ACTIONS = {"added", "removed"}
38@dataclass(frozen=True)
39class GitHubWebhookAction:
40 """Normalized webhook decision used by the API layer."""
42 should_process: bool
43 kind: str
44 event_name: str
45 repository: str | None
46 installation_id: str | None = None
47 drift_request: DriftJobRequest | None = None
48 changed_files: tuple[str, ...] = ()
49 ref: str | None = None
50 reason: str | None = None
53@dataclass(frozen=True)
54class GitHubPullRequestResult:
55 """Persistable result from a GitHub PR create/update action."""
57 branch_name: str
58 title: str
59 url: str
60 state: str
61 number: int | None = None
64def github_app_manifest() -> dict[str, Any]:
65 """Return the public GitHub App metadata exposed by the API."""
67 install_url = (
68 f"https://github.com/apps/{settings.github_app_slug}/installations/new"
69 if settings.github_app_slug
70 else None
71 )
72 return {
73 "name": settings.github_app_name,
74 "slug": settings.github_app_slug,
75 "app_id": settings.github_app_id,
76 "repository": f"{settings.project_owner}/{settings.project_repo}",
77 "webhook_url": f"{settings.api_base_url.rstrip('/')}/integrations/github/webhooks",
78 "setup_url": f"{settings.public_base_url.rstrip('/')}/app?setup=github-app",
79 "install_url": install_url,
80 "events": list(SUPPORTED_GITHUB_EVENTS),
81 "permissions": GITHUB_APP_PERMISSIONS,
82 "configured": bool(
83 settings.github_app_id
84 and settings.github_app_slug
85 and settings.github_webhook_secret
86 and settings.github_app_private_key
87 ),
88 "configured_fields": {
89 "github_app_id": bool(settings.github_app_id),
90 "github_app_slug": bool(settings.github_app_slug),
91 "github_webhook_secret": bool(settings.github_webhook_secret),
92 "github_app_private_key": bool(settings.github_app_private_key),
93 },
94 }
97def verify_github_signature(body: bytes, signature_header: str | None) -> None:
98 """Validate GitHub webhook signatures using the configured shared secret."""
100 if not settings.github_webhook_secret:
101 if settings.debug:
102 return
103 raise HTTPException(
104 status_code=503,
105 detail="GitHub webhook secret is not configured",
106 )
108 if not signature_header:
109 raise HTTPException(status_code=401, detail="Missing GitHub webhook signature")
110 if not signature_header.startswith("sha256="):
111 raise HTTPException(status_code=401, detail="Invalid GitHub webhook signature")
113 expected = (
114 "sha256="
115 + hmac.new(
116 settings.github_webhook_secret.encode("utf-8"),
117 body,
118 hashlib.sha256,
119 ).hexdigest()
120 )
121 if not hmac.compare_digest(expected, signature_header):
122 raise HTTPException(status_code=401, detail="Invalid GitHub webhook signature")
125def analyze_github_webhook(
126 event_name: str,
127 payload: dict[str, Any],
128) -> GitHubWebhookAction:
129 """Translate a GitHub webhook payload into a Documint action."""
131 repository = _repository_full_name(payload)
132 installation_id = _installation_id(payload)
133 if event_name == "ping":
134 return GitHubWebhookAction(
135 should_process=False,
136 kind="ignored",
137 event_name=event_name,
138 repository=repository,
139 installation_id=installation_id,
140 reason="ping",
141 )
143 if event_name not in SUPPORTED_GITHUB_EVENTS:
144 return GitHubWebhookAction(
145 should_process=False,
146 kind="ignored",
147 event_name=event_name,
148 repository=repository,
149 installation_id=installation_id,
150 reason="unsupported_event",
151 )
153 if event_name == "installation":
154 action = payload.get("action")
155 if action not in GITHUB_INSTALLATION_ACTIONS:
156 return GitHubWebhookAction(
157 should_process=False,
158 kind="ignored",
159 event_name=event_name,
160 repository=repository,
161 installation_id=installation_id,
162 reason=f"ignored_installation_action:{action}",
163 )
164 return GitHubWebhookAction(
165 should_process=True,
166 kind="installation_sync",
167 event_name=event_name,
168 repository=repository,
169 installation_id=installation_id,
170 ref=installation_id,
171 )
173 if event_name == "installation_repositories":
174 action = payload.get("action")
175 if action not in GITHUB_INSTALLATION_REPO_ACTIONS:
176 return GitHubWebhookAction(
177 should_process=False,
178 kind="ignored",
179 event_name=event_name,
180 repository=repository,
181 installation_id=installation_id,
182 reason=f"ignored_installation_repositories_action:{action}",
183 )
184 return GitHubWebhookAction(
185 should_process=True,
186 kind="installation_sync",
187 event_name=event_name,
188 repository=repository,
189 installation_id=installation_id,
190 ref=installation_id,
191 )
193 if event_name == "push":
194 changed_files = tuple(_collect_push_files(payload))
195 return GitHubWebhookAction(
196 should_process=True,
197 kind="drift",
198 event_name=event_name,
199 repository=repository,
200 installation_id=installation_id,
201 drift_request=DriftJobRequest(
202 project_id="",
203 signal_type=SourceSignalType.PUSH,
204 changed_files=list(changed_files),
205 ),
206 changed_files=changed_files,
207 ref=payload.get("after") or payload.get("ref"),
208 )
210 if event_name == "pull_request":
211 action = payload.get("action")
212 if action not in GITHUB_PR_ACTIONS:
213 return GitHubWebhookAction(
214 should_process=False,
215 kind="ignored",
216 event_name=event_name,
217 repository=repository,
218 installation_id=installation_id,
219 reason=f"ignored_pull_request_action:{action}",
220 )
221 return GitHubWebhookAction(
222 should_process=True,
223 kind="drift",
224 event_name=event_name,
225 repository=repository,
226 installation_id=installation_id,
227 drift_request=DriftJobRequest(
228 project_id="",
229 signal_type=SourceSignalType.PULL_REQUEST,
230 changed_files=[],
231 ),
232 ref=_nested_value(payload, "pull_request", "head", "sha"),
233 )
235 action = payload.get("action")
236 if action not in GITHUB_RELEASE_ACTIONS:
237 return GitHubWebhookAction(
238 should_process=False,
239 kind="ignored",
240 event_name=event_name,
241 repository=repository,
242 installation_id=installation_id,
243 reason=f"ignored_release_action:{action}",
244 )
245 return GitHubWebhookAction(
246 should_process=True,
247 kind="drift",
248 event_name=event_name,
249 repository=repository,
250 installation_id=installation_id,
251 drift_request=DriftJobRequest(
252 project_id="",
253 signal_type=SourceSignalType.RELEASE,
254 changed_files=[],
255 ),
256 ref=_nested_value(payload, "release", "tag_name"),
257 )
260def list_installation_repositories(installation_id: str) -> list[dict[str, Any]]:
261 """Fetch accessible repositories for a GitHub App installation."""
263 token = get_installation_access_token(installation_id)
264 repositories: list[dict[str, Any]] = []
265 page = 1
266 while True:
267 response = _github_request(
268 "GET",
269 "/installation/repositories",
270 token=token,
271 params={"per_page": 100, "page": page},
272 )
273 payload = response.json()
274 page_repositories = payload.get("repositories", [])
275 if not isinstance(page_repositories, list):
276 break
277 repositories.extend(
278 repo for repo in page_repositories if isinstance(repo, dict)
279 )
280 if len(page_repositories) < 100:
281 break
282 page += 1
283 return repositories
286def create_or_update_pull_request(
287 *,
288 installation_id: str,
289 owner: str,
290 repo: str,
291 base_branch: str,
292 branch_name: str,
293 target_path: str,
294 content_markdown: str,
295 title: str,
296 body: str,
297 commit_message: str,
298) -> GitHubPullRequestResult:
299 """Create or update a PR backed by a committed branch on GitHub."""
301 token = get_installation_access_token(installation_id)
302 base_sha = _get_branch_sha(owner, repo, base_branch, token)
303 _ensure_branch(owner, repo, branch_name, base_sha, token)
304 existing_sha = _get_content_sha(owner, repo, target_path, branch_name, token)
305 if existing_sha is None:
306 existing_sha = _get_content_sha(owner, repo, target_path, base_branch, token)
307 _upsert_file(
308 owner=owner,
309 repo=repo,
310 path=target_path,
311 branch=branch_name,
312 content=content_markdown,
313 message=commit_message,
314 token=token,
315 sha=existing_sha,
316 )
317 existing_pr = _find_existing_pull_request(
318 owner=owner,
319 repo=repo,
320 base_branch=base_branch,
321 branch_name=branch_name,
322 token=token,
323 )
324 if existing_pr is not None:
325 updated = _github_request(
326 "PATCH",
327 f"/repos/{owner}/{repo}/pulls/{existing_pr['number']}",
328 token=token,
329 json_body={"title": title, "body": body},
330 ).json()
331 return GitHubPullRequestResult(
332 branch_name=branch_name,
333 title=str(updated.get("title") or title),
334 url=str(updated["html_url"]),
335 state=str(updated.get("state") or "open"),
336 number=int(updated.get("number")) if updated.get("number") else None,
337 )
338 created = _github_request(
339 "POST",
340 f"/repos/{owner}/{repo}/pulls",
341 token=token,
342 json_body={
343 "title": title,
344 "body": body,
345 "head": branch_name,
346 "base": base_branch,
347 "maintainer_can_modify": True,
348 },
349 ).json()
350 return GitHubPullRequestResult(
351 branch_name=branch_name,
352 title=str(created.get("title") or title),
353 url=str(created["html_url"]),
354 state=str(created.get("state") or "open"),
355 number=int(created.get("number")) if created.get("number") else None,
356 )
359def get_installation_access_token(installation_id: str) -> str:
360 """Mint a short-lived installation token for a GitHub App installation."""
362 if not settings.github_app_id or not settings.github_app_private_key:
363 raise HTTPException(
364 status_code=503,
365 detail="GitHub App credentials are not configured for installation automation",
366 )
367 issued_at = datetime.now(tz=UTC)
368 payload = {
369 "iat": int((issued_at - timedelta(seconds=60)).timestamp()),
370 "exp": int((issued_at + timedelta(minutes=9)).timestamp()),
371 "iss": settings.github_app_id,
372 }
373 encoded = jwt.encode(
374 payload,
375 _normalized_private_key(settings.github_app_private_key),
376 algorithm="RS256",
377 )
378 response = _github_request(
379 "POST",
380 f"/app/installations/{installation_id}/access_tokens",
381 bearer_token=encoded,
382 )
383 token = response.json().get("token")
384 if not isinstance(token, str) or not token:
385 raise HTTPException(
386 status_code=502,
387 detail="GitHub did not return an installation access token",
388 )
389 return token
392def _github_request(
393 method: str,
394 path: str,
395 *,
396 token: str | None = None,
397 bearer_token: str | None = None,
398 json_body: dict[str, Any] | None = None,
399 params: dict[str, str | int | float | bool | None] | None = None,
400) -> httpx.Response:
401 headers = {
402 "Accept": "application/vnd.github+json",
403 "X-GitHub-Api-Version": settings.github_api_version,
404 "User-Agent": "documint/0.3",
405 }
406 if token is not None:
407 headers["Authorization"] = f"token {token}"
408 if bearer_token is not None:
409 headers["Authorization"] = f"Bearer {bearer_token}"
411 with httpx.Client(base_url=settings.github_api_url, timeout=20.0) as client:
412 response = client.request(
413 method,
414 path,
415 headers=headers,
416 json=json_body,
417 params=params,
418 )
419 if response.status_code >= 400:
420 detail = response.text
421 raise HTTPException(
422 status_code=502,
423 detail=f"GitHub API error for {method} {path}: {response.status_code} {detail}",
424 )
425 return response
428def _get_branch_sha(owner: str, repo: str, branch: str, token: str) -> str:
429 ref = _github_request(
430 "GET",
431 f"/repos/{owner}/{repo}/git/ref/heads/{quote(branch, safe='')}",
432 token=token,
433 ).json()
434 sha = ref.get("object", {}).get("sha")
435 if not isinstance(sha, str) or not sha:
436 raise HTTPException(
437 status_code=502, detail="GitHub branch ref did not include a SHA"
438 )
439 return sha
442def _ensure_branch(
443 owner: str, repo: str, branch: str, base_sha: str, token: str
444) -> None:
445 try:
446 _github_request(
447 "GET",
448 f"/repos/{owner}/{repo}/git/ref/heads/{quote(branch, safe='')}",
449 token=token,
450 )
451 return
452 except HTTPException as exc:
453 if "404" not in str(exc.detail):
454 raise
455 _github_request(
456 "POST",
457 f"/repos/{owner}/{repo}/git/refs",
458 token=token,
459 json_body={"ref": f"refs/heads/{branch}", "sha": base_sha},
460 )
463def _get_content_sha(
464 owner: str,
465 repo: str,
466 path: str,
467 branch: str,
468 token: str,
469) -> str | None:
470 try:
471 payload = _github_request(
472 "GET",
473 f"/repos/{owner}/{repo}/contents/{quote(path, safe='/')}",
474 token=token,
475 params={"ref": branch},
476 ).json()
477 except HTTPException as exc:
478 if "404" in str(exc.detail):
479 return None
480 raise
481 sha = payload.get("sha")
482 return sha if isinstance(sha, str) and sha else None
485def _upsert_file(
486 *,
487 owner: str,
488 repo: str,
489 path: str,
490 branch: str,
491 content: str,
492 message: str,
493 token: str,
494 sha: str | None,
495) -> None:
496 payload: dict[str, Any] = {
497 "message": message,
498 "content": base64.b64encode(content.encode("utf-8")).decode("ascii"),
499 "branch": branch,
500 }
501 if sha:
502 payload["sha"] = sha
503 _github_request(
504 "PUT",
505 f"/repos/{owner}/{repo}/contents/{quote(path, safe='/')}",
506 token=token,
507 json_body=payload,
508 )
511def _find_existing_pull_request(
512 *,
513 owner: str,
514 repo: str,
515 base_branch: str,
516 branch_name: str,
517 token: str,
518) -> dict[str, Any] | None:
519 payload = _github_request(
520 "GET",
521 f"/repos/{owner}/{repo}/pulls",
522 token=token,
523 params={"state": "open", "base": base_branch, "head": f"{owner}:{branch_name}"},
524 ).json()
525 if not isinstance(payload, list) or not payload:
526 return None
527 first = payload[0]
528 return first if isinstance(first, dict) else None
531def _normalized_private_key(raw_private_key: str) -> str:
532 return raw_private_key.replace("\\n", "\n").strip()
535def _repository_full_name(payload: dict[str, Any]) -> str | None:
536 repository = payload.get("repository")
537 if not isinstance(repository, dict):
538 return None
539 full_name = repository.get("full_name")
540 return full_name if isinstance(full_name, str) and full_name.strip() else None
543def _installation_id(payload: dict[str, Any]) -> str | None:
544 installation = payload.get("installation")
545 if not isinstance(installation, dict):
546 return None
547 installation_id = installation.get("id")
548 if isinstance(installation_id, int):
549 return str(installation_id)
550 if isinstance(installation_id, str) and installation_id.strip():
551 return installation_id
552 return None
555def _collect_push_files(payload: dict[str, Any]) -> list[str]:
556 changed_files: set[str] = set()
557 commits = payload.get("commits")
558 if not isinstance(commits, list):
559 commits = []
560 for commit in commits:
561 if not isinstance(commit, dict):
562 continue
563 for key in ("added", "modified", "removed"):
564 values = commit.get(key)
565 if not isinstance(values, list):
566 continue
567 changed_files.update(
568 path.strip()
569 for path in values
570 if isinstance(path, str) and path.strip()
571 )
572 return sorted(changed_files)
575def _nested_value(payload: dict[str, Any], *keys: str) -> str | None:
576 current: Any = payload
577 for key in keys:
578 if not isinstance(current, dict):
579 return None
580 current = current.get(key)
581 return current if isinstance(current, str) and current.strip() else None