Coverage for src / documint_mcp / github.py: 0%

206 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 22:30 -0400

1"""GitHub App metadata, repository sync, and PR automation for Documint V1.""" 

2 

3from __future__ import annotations 

4 

5import base64 

6import hashlib 

7import hmac 

8from dataclasses import dataclass 

9from datetime import UTC, datetime, timedelta 

10from typing import Any 

11from urllib.parse import quote 

12 

13import httpx 

14import jwt 

15from fastapi import HTTPException 

16 

17from .config import settings 

18from .models import DriftJobRequest, SourceSignalType 

19 

20SUPPORTED_GITHUB_EVENTS = ( 

21 "push", 

22 "pull_request", 

23 "release", 

24 "installation", 

25 "installation_repositories", 

26) 

27GITHUB_APP_PERMISSIONS = { 

28 "contents": "write", 

29 "metadata": "read", 

30 "pull_requests": "write", 

31} 

32GITHUB_PR_ACTIONS = {"opened", "reopened", "ready_for_review", "synchronize"} 

33GITHUB_RELEASE_ACTIONS = {"edited", "prereleased", "published", "released"} 

34GITHUB_INSTALLATION_ACTIONS = {"created", "new_permissions_accepted"} 

35GITHUB_INSTALLATION_REPO_ACTIONS = {"added", "removed"} 

36 

37 

38@dataclass(frozen=True) 

39class GitHubWebhookAction: 

40 """Normalized webhook decision used by the API layer.""" 

41 

42 should_process: bool 

43 kind: str 

44 event_name: str 

45 repository: str | None 

46 installation_id: str | None = None 

47 drift_request: DriftJobRequest | None = None 

48 changed_files: tuple[str, ...] = () 

49 ref: str | None = None 

50 reason: str | None = None 

51 

52 

53@dataclass(frozen=True) 

54class GitHubPullRequestResult: 

55 """Persistable result from a GitHub PR create/update action.""" 

56 

57 branch_name: str 

58 title: str 

59 url: str 

60 state: str 

61 number: int | None = None 

62 

63 

64def github_app_manifest() -> dict[str, Any]: 

65 """Return the public GitHub App metadata exposed by the API.""" 

66 

67 install_url = ( 

68 f"https://github.com/apps/{settings.github_app_slug}/installations/new" 

69 if settings.github_app_slug 

70 else None 

71 ) 

72 return { 

73 "name": settings.github_app_name, 

74 "slug": settings.github_app_slug, 

75 "app_id": settings.github_app_id, 

76 "repository": f"{settings.project_owner}/{settings.project_repo}", 

77 "webhook_url": f"{settings.api_base_url.rstrip('/')}/integrations/github/webhooks", 

78 "setup_url": f"{settings.public_base_url.rstrip('/')}/app?setup=github-app", 

79 "install_url": install_url, 

80 "events": list(SUPPORTED_GITHUB_EVENTS), 

81 "permissions": GITHUB_APP_PERMISSIONS, 

82 "configured": bool( 

83 settings.github_app_id 

84 and settings.github_app_slug 

85 and settings.github_webhook_secret 

86 and settings.github_app_private_key 

87 ), 

88 "configured_fields": { 

89 "github_app_id": bool(settings.github_app_id), 

90 "github_app_slug": bool(settings.github_app_slug), 

91 "github_webhook_secret": bool(settings.github_webhook_secret), 

92 "github_app_private_key": bool(settings.github_app_private_key), 

93 }, 

94 } 

95 

96 

97def verify_github_signature(body: bytes, signature_header: str | None) -> None: 

98 """Validate GitHub webhook signatures using the configured shared secret.""" 

99 

100 if not settings.github_webhook_secret: 

101 if settings.debug: 

102 return 

103 raise HTTPException( 

104 status_code=503, 

105 detail="GitHub webhook secret is not configured", 

106 ) 

107 

108 if not signature_header: 

109 raise HTTPException(status_code=401, detail="Missing GitHub webhook signature") 

110 if not signature_header.startswith("sha256="): 

111 raise HTTPException(status_code=401, detail="Invalid GitHub webhook signature") 

112 

113 expected = ( 

114 "sha256=" 

115 + hmac.new( 

116 settings.github_webhook_secret.encode("utf-8"), 

117 body, 

118 hashlib.sha256, 

119 ).hexdigest() 

120 ) 

121 if not hmac.compare_digest(expected, signature_header): 

122 raise HTTPException(status_code=401, detail="Invalid GitHub webhook signature") 

123 

124 

125def analyze_github_webhook( 

126 event_name: str, 

127 payload: dict[str, Any], 

128) -> GitHubWebhookAction: 

129 """Translate a GitHub webhook payload into a Documint action.""" 

130 

131 repository = _repository_full_name(payload) 

132 installation_id = _installation_id(payload) 

133 if event_name == "ping": 

134 return GitHubWebhookAction( 

135 should_process=False, 

136 kind="ignored", 

137 event_name=event_name, 

138 repository=repository, 

139 installation_id=installation_id, 

140 reason="ping", 

141 ) 

142 

143 if event_name not in SUPPORTED_GITHUB_EVENTS: 

144 return GitHubWebhookAction( 

145 should_process=False, 

146 kind="ignored", 

147 event_name=event_name, 

148 repository=repository, 

149 installation_id=installation_id, 

150 reason="unsupported_event", 

151 ) 

152 

153 if event_name == "installation": 

154 action = payload.get("action") 

155 if action not in GITHUB_INSTALLATION_ACTIONS: 

156 return GitHubWebhookAction( 

157 should_process=False, 

158 kind="ignored", 

159 event_name=event_name, 

160 repository=repository, 

161 installation_id=installation_id, 

162 reason=f"ignored_installation_action:{action}", 

163 ) 

164 return GitHubWebhookAction( 

165 should_process=True, 

166 kind="installation_sync", 

167 event_name=event_name, 

168 repository=repository, 

169 installation_id=installation_id, 

170 ref=installation_id, 

171 ) 

172 

173 if event_name == "installation_repositories": 

174 action = payload.get("action") 

175 if action not in GITHUB_INSTALLATION_REPO_ACTIONS: 

176 return GitHubWebhookAction( 

177 should_process=False, 

178 kind="ignored", 

179 event_name=event_name, 

180 repository=repository, 

181 installation_id=installation_id, 

182 reason=f"ignored_installation_repositories_action:{action}", 

183 ) 

184 return GitHubWebhookAction( 

185 should_process=True, 

186 kind="installation_sync", 

187 event_name=event_name, 

188 repository=repository, 

189 installation_id=installation_id, 

190 ref=installation_id, 

191 ) 

192 

193 if event_name == "push": 

194 changed_files = tuple(_collect_push_files(payload)) 

195 return GitHubWebhookAction( 

196 should_process=True, 

197 kind="drift", 

198 event_name=event_name, 

199 repository=repository, 

200 installation_id=installation_id, 

201 drift_request=DriftJobRequest( 

202 project_id="", 

203 signal_type=SourceSignalType.PUSH, 

204 changed_files=list(changed_files), 

205 ), 

206 changed_files=changed_files, 

207 ref=payload.get("after") or payload.get("ref"), 

208 ) 

209 

210 if event_name == "pull_request": 

211 action = payload.get("action") 

212 if action not in GITHUB_PR_ACTIONS: 

213 return GitHubWebhookAction( 

214 should_process=False, 

215 kind="ignored", 

216 event_name=event_name, 

217 repository=repository, 

218 installation_id=installation_id, 

219 reason=f"ignored_pull_request_action:{action}", 

220 ) 

221 return GitHubWebhookAction( 

222 should_process=True, 

223 kind="drift", 

224 event_name=event_name, 

225 repository=repository, 

226 installation_id=installation_id, 

227 drift_request=DriftJobRequest( 

228 project_id="", 

229 signal_type=SourceSignalType.PULL_REQUEST, 

230 changed_files=[], 

231 ), 

232 ref=_nested_value(payload, "pull_request", "head", "sha"), 

233 ) 

234 

235 action = payload.get("action") 

236 if action not in GITHUB_RELEASE_ACTIONS: 

237 return GitHubWebhookAction( 

238 should_process=False, 

239 kind="ignored", 

240 event_name=event_name, 

241 repository=repository, 

242 installation_id=installation_id, 

243 reason=f"ignored_release_action:{action}", 

244 ) 

245 return GitHubWebhookAction( 

246 should_process=True, 

247 kind="drift", 

248 event_name=event_name, 

249 repository=repository, 

250 installation_id=installation_id, 

251 drift_request=DriftJobRequest( 

252 project_id="", 

253 signal_type=SourceSignalType.RELEASE, 

254 changed_files=[], 

255 ), 

256 ref=_nested_value(payload, "release", "tag_name"), 

257 ) 

258 

259 

260def list_installation_repositories(installation_id: str) -> list[dict[str, Any]]: 

261 """Fetch accessible repositories for a GitHub App installation.""" 

262 

263 token = get_installation_access_token(installation_id) 

264 repositories: list[dict[str, Any]] = [] 

265 page = 1 

266 while True: 

267 response = _github_request( 

268 "GET", 

269 "/installation/repositories", 

270 token=token, 

271 params={"per_page": 100, "page": page}, 

272 ) 

273 payload = response.json() 

274 page_repositories = payload.get("repositories", []) 

275 if not isinstance(page_repositories, list): 

276 break 

277 repositories.extend( 

278 repo for repo in page_repositories if isinstance(repo, dict) 

279 ) 

280 if len(page_repositories) < 100: 

281 break 

282 page += 1 

283 return repositories 

284 

285 

286def create_or_update_pull_request( 

287 *, 

288 installation_id: str, 

289 owner: str, 

290 repo: str, 

291 base_branch: str, 

292 branch_name: str, 

293 target_path: str, 

294 content_markdown: str, 

295 title: str, 

296 body: str, 

297 commit_message: str, 

298) -> GitHubPullRequestResult: 

299 """Create or update a PR backed by a committed branch on GitHub.""" 

300 

301 token = get_installation_access_token(installation_id) 

302 base_sha = _get_branch_sha(owner, repo, base_branch, token) 

303 _ensure_branch(owner, repo, branch_name, base_sha, token) 

304 existing_sha = _get_content_sha(owner, repo, target_path, branch_name, token) 

305 if existing_sha is None: 

306 existing_sha = _get_content_sha(owner, repo, target_path, base_branch, token) 

307 _upsert_file( 

308 owner=owner, 

309 repo=repo, 

310 path=target_path, 

311 branch=branch_name, 

312 content=content_markdown, 

313 message=commit_message, 

314 token=token, 

315 sha=existing_sha, 

316 ) 

317 existing_pr = _find_existing_pull_request( 

318 owner=owner, 

319 repo=repo, 

320 base_branch=base_branch, 

321 branch_name=branch_name, 

322 token=token, 

323 ) 

324 if existing_pr is not None: 

325 updated = _github_request( 

326 "PATCH", 

327 f"/repos/{owner}/{repo}/pulls/{existing_pr['number']}", 

328 token=token, 

329 json_body={"title": title, "body": body}, 

330 ).json() 

331 return GitHubPullRequestResult( 

332 branch_name=branch_name, 

333 title=str(updated.get("title") or title), 

334 url=str(updated["html_url"]), 

335 state=str(updated.get("state") or "open"), 

336 number=int(updated.get("number")) if updated.get("number") else None, 

337 ) 

338 created = _github_request( 

339 "POST", 

340 f"/repos/{owner}/{repo}/pulls", 

341 token=token, 

342 json_body={ 

343 "title": title, 

344 "body": body, 

345 "head": branch_name, 

346 "base": base_branch, 

347 "maintainer_can_modify": True, 

348 }, 

349 ).json() 

350 return GitHubPullRequestResult( 

351 branch_name=branch_name, 

352 title=str(created.get("title") or title), 

353 url=str(created["html_url"]), 

354 state=str(created.get("state") or "open"), 

355 number=int(created.get("number")) if created.get("number") else None, 

356 ) 

357 

358 

359def get_installation_access_token(installation_id: str) -> str: 

360 """Mint a short-lived installation token for a GitHub App installation.""" 

361 

362 if not settings.github_app_id or not settings.github_app_private_key: 

363 raise HTTPException( 

364 status_code=503, 

365 detail="GitHub App credentials are not configured for installation automation", 

366 ) 

367 issued_at = datetime.now(tz=UTC) 

368 payload = { 

369 "iat": int((issued_at - timedelta(seconds=60)).timestamp()), 

370 "exp": int((issued_at + timedelta(minutes=9)).timestamp()), 

371 "iss": settings.github_app_id, 

372 } 

373 encoded = jwt.encode( 

374 payload, 

375 _normalized_private_key(settings.github_app_private_key), 

376 algorithm="RS256", 

377 ) 

378 response = _github_request( 

379 "POST", 

380 f"/app/installations/{installation_id}/access_tokens", 

381 bearer_token=encoded, 

382 ) 

383 token = response.json().get("token") 

384 if not isinstance(token, str) or not token: 

385 raise HTTPException( 

386 status_code=502, 

387 detail="GitHub did not return an installation access token", 

388 ) 

389 return token 

390 

391 

392def _github_request( 

393 method: str, 

394 path: str, 

395 *, 

396 token: str | None = None, 

397 bearer_token: str | None = None, 

398 json_body: dict[str, Any] | None = None, 

399 params: dict[str, str | int | float | bool | None] | None = None, 

400) -> httpx.Response: 

401 headers = { 

402 "Accept": "application/vnd.github+json", 

403 "X-GitHub-Api-Version": settings.github_api_version, 

404 "User-Agent": "documint/0.3", 

405 } 

406 if token is not None: 

407 headers["Authorization"] = f"token {token}" 

408 if bearer_token is not None: 

409 headers["Authorization"] = f"Bearer {bearer_token}" 

410 

411 with httpx.Client(base_url=settings.github_api_url, timeout=20.0) as client: 

412 response = client.request( 

413 method, 

414 path, 

415 headers=headers, 

416 json=json_body, 

417 params=params, 

418 ) 

419 if response.status_code >= 400: 

420 detail = response.text 

421 raise HTTPException( 

422 status_code=502, 

423 detail=f"GitHub API error for {method} {path}: {response.status_code} {detail}", 

424 ) 

425 return response 

426 

427 

428def _get_branch_sha(owner: str, repo: str, branch: str, token: str) -> str: 

429 ref = _github_request( 

430 "GET", 

431 f"/repos/{owner}/{repo}/git/ref/heads/{quote(branch, safe='')}", 

432 token=token, 

433 ).json() 

434 sha = ref.get("object", {}).get("sha") 

435 if not isinstance(sha, str) or not sha: 

436 raise HTTPException( 

437 status_code=502, detail="GitHub branch ref did not include a SHA" 

438 ) 

439 return sha 

440 

441 

442def _ensure_branch( 

443 owner: str, repo: str, branch: str, base_sha: str, token: str 

444) -> None: 

445 try: 

446 _github_request( 

447 "GET", 

448 f"/repos/{owner}/{repo}/git/ref/heads/{quote(branch, safe='')}", 

449 token=token, 

450 ) 

451 return 

452 except HTTPException as exc: 

453 if "404" not in str(exc.detail): 

454 raise 

455 _github_request( 

456 "POST", 

457 f"/repos/{owner}/{repo}/git/refs", 

458 token=token, 

459 json_body={"ref": f"refs/heads/{branch}", "sha": base_sha}, 

460 ) 

461 

462 

463def _get_content_sha( 

464 owner: str, 

465 repo: str, 

466 path: str, 

467 branch: str, 

468 token: str, 

469) -> str | None: 

470 try: 

471 payload = _github_request( 

472 "GET", 

473 f"/repos/{owner}/{repo}/contents/{quote(path, safe='/')}", 

474 token=token, 

475 params={"ref": branch}, 

476 ).json() 

477 except HTTPException as exc: 

478 if "404" in str(exc.detail): 

479 return None 

480 raise 

481 sha = payload.get("sha") 

482 return sha if isinstance(sha, str) and sha else None 

483 

484 

485def _upsert_file( 

486 *, 

487 owner: str, 

488 repo: str, 

489 path: str, 

490 branch: str, 

491 content: str, 

492 message: str, 

493 token: str, 

494 sha: str | None, 

495) -> None: 

496 payload: dict[str, Any] = { 

497 "message": message, 

498 "content": base64.b64encode(content.encode("utf-8")).decode("ascii"), 

499 "branch": branch, 

500 } 

501 if sha: 

502 payload["sha"] = sha 

503 _github_request( 

504 "PUT", 

505 f"/repos/{owner}/{repo}/contents/{quote(path, safe='/')}", 

506 token=token, 

507 json_body=payload, 

508 ) 

509 

510 

511def _find_existing_pull_request( 

512 *, 

513 owner: str, 

514 repo: str, 

515 base_branch: str, 

516 branch_name: str, 

517 token: str, 

518) -> dict[str, Any] | None: 

519 payload = _github_request( 

520 "GET", 

521 f"/repos/{owner}/{repo}/pulls", 

522 token=token, 

523 params={"state": "open", "base": base_branch, "head": f"{owner}:{branch_name}"}, 

524 ).json() 

525 if not isinstance(payload, list) or not payload: 

526 return None 

527 first = payload[0] 

528 return first if isinstance(first, dict) else None 

529 

530 

531def _normalized_private_key(raw_private_key: str) -> str: 

532 return raw_private_key.replace("\\n", "\n").strip() 

533 

534 

535def _repository_full_name(payload: dict[str, Any]) -> str | None: 

536 repository = payload.get("repository") 

537 if not isinstance(repository, dict): 

538 return None 

539 full_name = repository.get("full_name") 

540 return full_name if isinstance(full_name, str) and full_name.strip() else None 

541 

542 

543def _installation_id(payload: dict[str, Any]) -> str | None: 

544 installation = payload.get("installation") 

545 if not isinstance(installation, dict): 

546 return None 

547 installation_id = installation.get("id") 

548 if isinstance(installation_id, int): 

549 return str(installation_id) 

550 if isinstance(installation_id, str) and installation_id.strip(): 

551 return installation_id 

552 return None 

553 

554 

555def _collect_push_files(payload: dict[str, Any]) -> list[str]: 

556 changed_files: set[str] = set() 

557 commits = payload.get("commits") 

558 if not isinstance(commits, list): 

559 commits = [] 

560 for commit in commits: 

561 if not isinstance(commit, dict): 

562 continue 

563 for key in ("added", "modified", "removed"): 

564 values = commit.get(key) 

565 if not isinstance(values, list): 

566 continue 

567 changed_files.update( 

568 path.strip() 

569 for path in values 

570 if isinstance(path, str) and path.strip() 

571 ) 

572 return sorted(changed_files) 

573 

574 

575def _nested_value(payload: dict[str, Any], *keys: str) -> str | None: 

576 current: Any = payload 

577 for key in keys: 

578 if not isinstance(current, dict): 

579 return None 

580 current = current.get(key) 

581 return current if isinstance(current, str) and current.strip() else None