Coverage for session_buddy / utils / git_operations.py: 74.51%

222 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Git operations utilities for session management.""" 

3 

4from __future__ import annotations 

5 

6import subprocess # nosec B404 

7from dataclasses import dataclass 

8from datetime import datetime 

9from pathlib import Path 

10from typing import Any 

11 

12 

13@dataclass 

14class WorktreeInfo: 

15 """Information about a git worktree.""" 

16 

17 path: Path 

18 branch: str 

19 is_bare: bool = False 

20 is_detached: bool = False 

21 is_main_worktree: bool = False 

22 locked: bool = False 

23 prunable: bool = False 

24 

25 

26def is_git_repository(directory: str | Path) -> bool: 

27 """Check if the given directory is a git repository or worktree.""" 

28 if isinstance(directory, str): 

29 directory = Path(directory) 

30 git_dir = directory / ".git" 

31 # Check for both main repo (.git directory) and worktree (.git file) 

32 return git_dir.exists() and (git_dir.is_dir() or git_dir.is_file()) 

33 

34 

35def is_git_worktree(directory: Path) -> bool: 

36 """Check if the directory is a git worktree (not the main repository).""" 

37 if isinstance(directory, str): 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true

38 directory = Path(directory) 

39 git_path = directory / ".git" 

40 # Worktrees have a .git file that points to the actual git directory 

41 return git_path.exists() and git_path.is_file() 

42 

43 

44def get_git_root(directory: str | Path) -> Path | None: 

45 """Get the root directory of the git repository.""" 

46 if not is_git_repository(directory): 

47 return None 

48 

49 try: 

50 result = subprocess.run( 

51 ["git", "rev-parse", "--show-toplevel"], 

52 capture_output=True, 

53 text=True, 

54 cwd=directory, 

55 check=True, 

56 ) 

57 return Path(result.stdout.strip()) 

58 except subprocess.CalledProcessError: 

59 return None 

60 

61 

62def get_worktree_info(directory: Path) -> WorktreeInfo | None: 

63 """Get information about the current worktree.""" 

64 if not is_git_repository(directory): 

65 return None 

66 

67 try: 

68 # Get current branch 

69 branch_result = subprocess.run( 

70 ["git", "branch", "--show-current"], 

71 capture_output=True, 

72 text=True, 

73 cwd=directory, 

74 check=True, 

75 ) 

76 branch = branch_result.stdout.strip() 

77 

78 # Check if detached HEAD 

79 is_detached = False 

80 if not branch: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 head_result = subprocess.run( 

82 ["git", "rev-parse", "--short", "HEAD"], 

83 capture_output=True, 

84 text=True, 

85 cwd=directory, 

86 check=True, 

87 ) 

88 branch = f"HEAD ({head_result.stdout.strip()})" 

89 is_detached = True 

90 

91 # Get worktree path (normalized) 

92 toplevel_result = subprocess.run( 

93 ["git", "rev-parse", "--show-toplevel"], 

94 capture_output=True, 

95 text=True, 

96 cwd=directory, 

97 check=True, 

98 ) 

99 path = Path(toplevel_result.stdout.strip()) 

100 

101 return WorktreeInfo( 

102 path=path, 

103 branch=branch, 

104 is_detached=is_detached, 

105 is_main_worktree=not is_git_worktree(directory), 

106 ) 

107 

108 except subprocess.CalledProcessError: 

109 return None 

110 

111 

112def _process_worktree_line(line: str, current_worktree: dict[str, Any]) -> None: 

113 """Process a single line from git worktree list --porcelain output.""" 

114 if line.startswith("worktree "): 

115 current_worktree["path"] = line[9:] # Remove 'worktree ' prefix 

116 elif line.startswith("HEAD "): 

117 current_worktree["head"] = line[5:] # Remove 'HEAD ' prefix 

118 elif line.startswith("branch "): 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true

119 current_worktree["branch"] = line[7:] # Remove 'branch ' prefix 

120 elif line == "bare": 

121 current_worktree["bare"] = True 

122 elif line == "detached": 

123 current_worktree["detached"] = True 

124 elif line.startswith("locked"): 

125 current_worktree["locked"] = True 

126 elif line == "prunable": 

127 current_worktree["prunable"] = True 

128 

129 

130def list_worktrees(directory: Path) -> list[WorktreeInfo]: 

131 """List all worktrees for the repository.""" 

132 if not is_git_repository(directory): 

133 return [] 

134 

135 result = _run_git_worktree_list(directory) 

136 if result is None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 return [] 

138 

139 return _parse_worktree_list_output(result.stdout) 

140 

141 

142def _run_git_worktree_list(directory: Path) -> subprocess.CompletedProcess[str] | None: 

143 """Run git worktree list command and return result or None on error.""" 

144 try: 

145 return subprocess.run( 

146 ["git", "worktree", "list", "--porcelain"], 

147 capture_output=True, 

148 text=True, 

149 cwd=directory, 

150 check=True, 

151 ) 

152 except subprocess.CalledProcessError: 

153 return None 

154 

155 

156def _parse_worktree_list_output(output: str) -> list[WorktreeInfo]: 

157 """Parse the output of git worktree list command.""" 

158 worktrees = [] 

159 current_worktree: dict[str, Any] = {} 

160 

161 for line in output.strip().split("\n"): 

162 if not line: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 if current_worktree: 

164 worktrees.append(_parse_worktree_entry(current_worktree)) 

165 current_worktree = {} 

166 continue 

167 

168 _process_worktree_line(line, current_worktree) 

169 

170 # Handle last worktree if exists 

171 if current_worktree: 171 ↛ 174line 171 didn't jump to line 174 because the condition on line 171 was always true

172 worktrees.append(_parse_worktree_entry(current_worktree)) 

173 

174 return worktrees 

175 

176 

177def _parse_worktree_entry(entry: dict[str, Any]) -> WorktreeInfo: 

178 """Parse a single worktree entry from git worktree list output.""" 

179 path = Path(entry.get("path", "")) 

180 branch = entry.get("branch", entry.get("head", "unknown")) 

181 

182 # Check if this is the main worktree (bare repos don't have .git file) 

183 is_main = not (path / ".git").is_file() if path.exists() else False 

184 

185 return WorktreeInfo( 

186 path=path, 

187 branch=str(branch), 

188 is_bare=entry.get("bare", False), 

189 is_detached=entry.get("detached", False), 

190 is_main_worktree=is_main, 

191 locked=entry.get("locked", False), 

192 prunable=entry.get("prunable", False), 

193 ) 

194 

195 

196def get_git_status(directory: Path) -> tuple[list[str], list[str]]: 

197 """Get modified and untracked files from git status.""" 

198 if not is_git_repository(directory): 

199 return [], [] 

200 

201 try: 

202 status_result = subprocess.run( 

203 ["git", "status", "--porcelain"], 

204 capture_output=True, 

205 text=True, 

206 cwd=directory, 

207 check=True, 

208 ) 

209 

210 status_lines = ( 

211 status_result.stdout.strip().split("\n") 

212 if status_result.stdout.strip() 

213 else [] 

214 ) 

215 

216 return _parse_git_status(status_lines) 

217 except subprocess.CalledProcessError: 

218 return [], [] 

219 

220 

221def _parse_git_status(status_lines: list[str]) -> tuple[list[str], list[str]]: 

222 """Parse git status output into modified and untracked files.""" 

223 modified_files = [] 

224 untracked_files = [] 

225 

226 for line in status_lines: 

227 if line: 227 ↛ 226line 227 didn't jump to line 226 because the condition on line 227 was always true

228 # Extract the status (first 2 characters) and file path 

229 status = line[:2] 

230 filepath = line[2:].lstrip() # Remove leading whitespace 

231 

232 if status == "??": 

233 untracked_files.append(filepath) 

234 elif status.strip(): # If status has meaningful content (not just spaces) 234 ↛ 226line 234 didn't jump to line 226 because the condition on line 234 was always true

235 modified_files.append(filepath) 

236 

237 return modified_files, untracked_files 

238 

239 

240def stage_files(directory: Path, files: list[str]) -> bool: 

241 """Stage files for commit.""" 

242 if not is_git_repository(directory) or not files: 

243 return False 

244 

245 try: 

246 # Stage all changes (handles modified, deleted, and new files) 

247 subprocess.run( 

248 ["git", "add", "-A"], 

249 cwd=directory, 

250 capture_output=True, 

251 text=True, 

252 check=True, 

253 ) 

254 return True 

255 except subprocess.CalledProcessError: 

256 # Debug: Print the actual error 

257 return False 

258 

259 

260def get_staged_files(directory: Path) -> list[str]: 

261 """Get list of staged files.""" 

262 if not is_git_repository(directory): 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 return [] 

264 

265 try: 

266 staged_result = subprocess.run( 

267 ["git", "diff", "--cached", "--name-only"], 

268 capture_output=True, 

269 text=True, 

270 cwd=directory, 

271 check=True, 

272 ) 

273 

274 return ( 

275 staged_result.stdout.strip().split("\n") 

276 if staged_result.stdout.strip() 

277 else [] 

278 ) 

279 except subprocess.CalledProcessError: 

280 return [] 

281 

282 

283def create_commit(directory: Path, message: str) -> tuple[bool, str]: 

284 """Create a git commit with the given message. 

285 

286 Returns: 

287 tuple: (success, commit_hash or error_message) 

288 

289 """ 

290 if not is_git_repository(directory): 

291 return False, "Not a git repository" 

292 

293 try: 

294 subprocess.run( 

295 ["git", "commit", "-m", message], 

296 capture_output=True, 

297 text=True, 

298 cwd=directory, 

299 check=True, 

300 ) 

301 

302 # Get commit hash 

303 hash_result = subprocess.run( 

304 ["git", "rev-parse", "HEAD"], 

305 capture_output=True, 

306 text=True, 

307 cwd=directory, 

308 check=True, 

309 ) 

310 

311 commit_hash = hash_result.stdout.strip()[:8] 

312 return True, commit_hash 

313 

314 except subprocess.CalledProcessError as e: 

315 return False, e.stderr.strip() if e.stderr else str(e) 

316 

317 

318def _add_worktree_context_output( 

319 worktree_info: WorktreeInfo | None, 

320 output: list[str], 

321) -> None: 

322 """Add worktree context information to output.""" 

323 if worktree_info: 323 ↛ exitline 323 didn't return from function '_add_worktree_context_output' because the condition on line 323 was always true

324 if worktree_info.is_main_worktree: 324 ↛ 327line 324 didn't jump to line 327 because the condition on line 324 was always true

325 output.append(f"📝 Main repository on branch '{worktree_info.branch}'") 

326 else: 

327 output.append( 

328 f"🌿 Worktree on branch '{worktree_info.branch}' at {worktree_info.path}", 

329 ) 

330 

331 

332def _create_checkpoint_message( 

333 project: str, 

334 quality_score: int, 

335 worktree_info: WorktreeInfo | None, 

336) -> str: 

337 """Create the checkpoint commit message.""" 

338 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") 

339 

340 # Enhanced commit message with worktree info 

341 worktree_suffix = "" 

342 if worktree_info and not worktree_info.is_main_worktree: 

343 worktree_suffix = f" [worktree: {worktree_info.branch}]" 

344 

345 commit_message = ( 

346 f"checkpoint: Session checkpoint - {timestamp}{worktree_suffix}\n\n" 

347 f"Automatic checkpoint commit via session-management MCP server\n" 

348 f"Project: {project}\n" 

349 f"Quality Score: {quality_score}/100" 

350 ) 

351 

352 if worktree_info: 

353 commit_message += f"\nBranch: {worktree_info.branch}" 

354 if not worktree_info.is_main_worktree: 

355 commit_message += f"\nWorktree: {worktree_info.path}" 

356 

357 return commit_message 

358 

359 

360def _validate_git_repository(directory: Path) -> tuple[bool, str, list[str]]: 

361 """Validate that the directory is a git repository.""" 

362 output = [] 

363 if not is_git_repository(directory): 

364 output.append("ℹ️ Not a git repository - skipping commit") 

365 return False, "Not a git repository", output 

366 return True, "", output 

367 

368 

369def _check_for_changes(directory: Path) -> tuple[list[str], list[str], list[str]]: 

370 """Check for modified and untracked files.""" 

371 worktree_info = get_worktree_info(directory) 

372 modified_files, untracked_files = get_git_status(directory) 

373 

374 output = [] 

375 if not modified_files and not untracked_files: 

376 output.append("✅ Working directory is clean - no changes to commit") 

377 return [], [], output 

378 

379 _add_worktree_context_output(worktree_info, output) 

380 output.append( 

381 f"📝 Found {len(modified_files)} modified files and {len(untracked_files)} untracked files", 

382 ) 

383 

384 if untracked_files: 

385 output.extend(_format_untracked_files(untracked_files)) 

386 

387 return modified_files, untracked_files, output 

388 

389 

390def _perform_staging_and_commit( 

391 directory: Path, 

392 project: str, 

393 quality_score: int, 

394) -> tuple[bool, str, list[str]]: 

395 """Stage changes and create commit.""" 

396 output = [] 

397 

398 # Create commit message 

399 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") 

400 commit_message = ( 

401 f"checkpoint: {project} (quality: {quality_score}/100) - {timestamp}" 

402 ) 

403 

404 # Stage changes 

405 stage_result = subprocess.run( 

406 ["git", "add", "-A"], 

407 cwd=directory, 

408 capture_output=True, 

409 text=True, 

410 check=False, 

411 ) 

412 

413 if stage_result.returncode != 0: 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true

414 output.append(f"⚠️ Failed to stage changes: {stage_result.stderr.strip()}") 

415 return False, "staging failed", output 

416 

417 # Create commit 

418 commit_result = subprocess.run( 

419 ["git", "commit", "-m", commit_message], 

420 cwd=directory, 

421 capture_output=True, 

422 text=True, 

423 check=False, 

424 ) 

425 

426 if commit_result.returncode != 0: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 output.append(f"⚠️ Commit failed: {commit_result.stderr.strip()}") 

428 return False, "commit failed", output 

429 

430 # Get commit hash 

431 hash_result = subprocess.run( 

432 ["git", "rev-parse", "HEAD"], 

433 cwd=directory, 

434 capture_output=True, 

435 text=True, 

436 check=False, 

437 ) 

438 

439 commit_hash = ( 

440 hash_result.stdout.strip()[:8] if hash_result.returncode == 0 else "unknown" 

441 ) 

442 

443 output.extend( 

444 ( 

445 f"✅ Checkpoint commit created successfully ({commit_hash})", 

446 f" Message: {commit_message}", 

447 ) 

448 ) 

449 return True, commit_hash, output 

450 

451 

452def create_checkpoint_commit( 

453 directory: Path, 

454 project: str, 

455 quality_score: int, 

456) -> tuple[bool, str, list[str]]: 

457 """Create an automatic checkpoint commit. 

458 

459 Returns: 

460 tuple: (success, commit_hash_or_error, output_messages) 

461 

462 """ 

463 # Validate git repository 

464 valid, error, output = _validate_git_repository(directory) 

465 if not valid: 

466 return False, error, output 

467 

468 try: 

469 # Check for changes 

470 modified_files, untracked_files, check_output = _check_for_changes(directory) 

471 output.extend(check_output) 

472 

473 if not modified_files and not untracked_files: 

474 return True, "clean", output 

475 

476 # Handle modified files 

477 if modified_files: 

478 success, result, commit_output = _perform_staging_and_commit( 

479 directory, 

480 project, 

481 quality_score, 

482 ) 

483 output.extend(commit_output) 

484 return success, result, output 

485 

486 # Only untracked files remain 

487 if untracked_files: 487 ↛ 499line 487 didn't jump to line 499 because the condition on line 487 was always true

488 output.append("ℹ️ No staged changes to commit") 

489 output.append( 

490 " 💡 Add untracked files with 'git add' if you want to include them", 

491 ) 

492 return False, "No staged changes", output 

493 

494 except Exception as e: 

495 error_msg = f"Git operations error: {e}" 

496 output.append(f"⚠️ {error_msg}") 

497 return False, error_msg, output 

498 

499 return False, "Unexpected error", output 

500 

501 

502def _format_untracked_files(untracked_files: list[str]) -> list[str]: 

503 """Format untracked files display.""" 

504 output = [] 

505 output.append("🆕 Untracked files found:") 

506 

507 for file in untracked_files[:10]: # Limit to first 10 for display 

508 output.append(f"{file}") 

509 

510 if len(untracked_files) > 10: 510 ↛ 511line 510 didn't jump to line 511 because the condition on line 510 was never true

511 output.append(f" ... and {len(untracked_files) - 10} more") 

512 

513 output.extend( 

514 ( 

515 "⚠️ Please manually review and add untracked files if needed:", 

516 " Use: git add <file> for files you want to include", 

517 ) 

518 ) 

519 

520 return output