Coverage for session_mgmt_mcp/utils/git_operations.py: 10.45%

205 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Git operations utilities for session management.""" 

3 

4import subprocess 

5from dataclasses import dataclass 

6from datetime import datetime 

7from pathlib import Path 

8 

9 

10@dataclass 

11class WorktreeInfo: 

12 """Information about a git worktree.""" 

13 

14 path: Path 

15 branch: str 

16 is_bare: bool = False 

17 is_detached: bool = False 

18 is_main_worktree: bool = False 

19 locked: bool = False 

20 prunable: bool = False 

21 

22 

23def is_git_repository(directory) -> bool: 

24 """Check if the given directory is a git repository or worktree.""" 

25 if isinstance(directory, str): 

26 directory = Path(directory) 

27 git_dir = directory / ".git" 

28 # Check for both main repo (.git directory) and worktree (.git file) 

29 return git_dir.exists() and (git_dir.is_dir() or git_dir.is_file()) 

30 

31 

32def is_git_worktree(directory: Path) -> bool: 

33 """Check if the directory is a git worktree (not the main repository).""" 

34 if isinstance(directory, str): 

35 directory = Path(directory) 

36 git_path = directory / ".git" 

37 # Worktrees have a .git file that points to the actual git directory 

38 return git_path.exists() and git_path.is_file() 

39 

40 

41def get_git_root(directory: Path) -> Path | None: 

42 """Get the root directory of the git repository.""" 

43 if not is_git_repository(directory): 

44 return None 

45 

46 try: 

47 result = subprocess.run( 

48 ["git", "rev-parse", "--show-toplevel"], 

49 capture_output=True, 

50 text=True, 

51 cwd=directory, 

52 check=True, 

53 ) 

54 return Path(result.stdout.strip()) 

55 except subprocess.CalledProcessError: 

56 return None 

57 

58 

59def get_worktree_info(directory: Path) -> WorktreeInfo | None: 

60 """Get information about the current worktree.""" 

61 if not is_git_repository(directory): 

62 return None 

63 

64 try: 

65 # Get current branch 

66 branch_result = subprocess.run( 

67 ["git", "branch", "--show-current"], 

68 capture_output=True, 

69 text=True, 

70 cwd=directory, 

71 check=True, 

72 ) 

73 branch = branch_result.stdout.strip() 

74 

75 # Check if detached HEAD 

76 is_detached = False 

77 if not branch: 

78 head_result = subprocess.run( 

79 ["git", "rev-parse", "--short", "HEAD"], 

80 capture_output=True, 

81 text=True, 

82 cwd=directory, 

83 check=True, 

84 ) 

85 branch = f"HEAD ({head_result.stdout.strip()})" 

86 is_detached = True 

87 

88 # Get worktree path (normalized) 

89 toplevel_result = subprocess.run( 

90 ["git", "rev-parse", "--show-toplevel"], 

91 capture_output=True, 

92 text=True, 

93 cwd=directory, 

94 check=True, 

95 ) 

96 path = Path(toplevel_result.stdout.strip()) 

97 

98 return WorktreeInfo( 

99 path=path, 

100 branch=branch, 

101 is_detached=is_detached, 

102 is_main_worktree=not is_git_worktree(directory), 

103 ) 

104 

105 except subprocess.CalledProcessError: 

106 return None 

107 

108 

109def _process_worktree_line(line: str, current_worktree: dict) -> None: 

110 """Process a single line from git worktree list --porcelain output.""" 

111 if line.startswith("worktree "): 

112 current_worktree["path"] = line[9:] # Remove 'worktree ' prefix 

113 elif line.startswith("HEAD "): 

114 current_worktree["head"] = line[5:] # Remove 'HEAD ' prefix 

115 elif line.startswith("branch "): 

116 current_worktree["branch"] = line[7:] # Remove 'branch ' prefix 

117 elif line == "bare": 

118 current_worktree["bare"] = True 

119 elif line == "detached": 

120 current_worktree["detached"] = True 

121 elif line.startswith("locked"): 

122 current_worktree["locked"] = True 

123 elif line == "prunable": 

124 current_worktree["prunable"] = True 

125 

126 

127def list_worktrees(directory: Path) -> list[WorktreeInfo]: 

128 """List all worktrees for the repository.""" 

129 if not is_git_repository(directory): 

130 return [] 

131 

132 try: 

133 result = subprocess.run( 

134 ["git", "worktree", "list", "--porcelain"], 

135 capture_output=True, 

136 text=True, 

137 cwd=directory, 

138 check=True, 

139 ) 

140 

141 worktrees = [] 

142 current_worktree = {} 

143 

144 for line in result.stdout.strip().split("\n"): 

145 if not line: 

146 if current_worktree: 

147 worktrees.append(_parse_worktree_entry(current_worktree)) 

148 current_worktree = {} 

149 continue 

150 

151 _process_worktree_line(line, current_worktree) 

152 

153 # Handle last worktree if exists 

154 if current_worktree: 

155 worktrees.append(_parse_worktree_entry(current_worktree)) 

156 

157 return worktrees 

158 

159 except subprocess.CalledProcessError: 

160 return [] 

161 

162 

163def _parse_worktree_entry(entry: dict) -> WorktreeInfo: 

164 """Parse a single worktree entry from git worktree list output.""" 

165 path = Path(entry.get("path", "")) 

166 branch = entry.get("branch", entry.get("head", "unknown")) 

167 

168 # Check if this is the main worktree (bare repos don't have .git file) 

169 is_main = not (path / ".git").is_file() if path.exists() else False 

170 

171 return WorktreeInfo( 

172 path=path, 

173 branch=branch, 

174 is_bare=entry.get("bare", False), 

175 is_detached=entry.get("detached", False), 

176 is_main_worktree=is_main, 

177 locked=entry.get("locked", False), 

178 prunable=entry.get("prunable", False), 

179 ) 

180 

181 

182def get_git_status(directory: Path) -> tuple[list[str], list[str]]: 

183 """Get modified and untracked files from git status.""" 

184 if not is_git_repository(directory): 

185 return [], [] 

186 

187 try: 

188 status_result = subprocess.run( 

189 ["git", "status", "--porcelain"], 

190 capture_output=True, 

191 text=True, 

192 cwd=directory, 

193 check=True, 

194 ) 

195 

196 status_lines = ( 

197 status_result.stdout.strip().split("\n") 

198 if status_result.stdout.strip() 

199 else [] 

200 ) 

201 

202 return _parse_git_status(status_lines) 

203 except subprocess.CalledProcessError: 

204 return [], [] 

205 

206 

207def _parse_git_status(status_lines: list[str]) -> tuple[list[str], list[str]]: 

208 """Parse git status output into modified and untracked files.""" 

209 modified_files = [] 

210 untracked_files = [] 

211 

212 for line in status_lines: 

213 if line: 

214 status = line[:2] 

215 filepath = line[3:] 

216 if status == "??": 

217 untracked_files.append(filepath) 

218 elif status.strip(): 

219 modified_files.append(filepath) 

220 

221 return modified_files, untracked_files 

222 

223 

224def stage_files(directory: Path, files: list[str]) -> bool: 

225 """Stage files for commit.""" 

226 if not is_git_repository(directory) or not files: 

227 return False 

228 

229 try: 

230 for file in files: 

231 subprocess.run( 

232 ["git", "add", file], 

233 cwd=directory, 

234 capture_output=True, 

235 check=True, 

236 ) 

237 return True 

238 except subprocess.CalledProcessError: 

239 return False 

240 

241 

242def get_staged_files(directory: Path) -> list[str]: 

243 """Get list of staged files.""" 

244 if not is_git_repository(directory): 

245 return [] 

246 

247 try: 

248 staged_result = subprocess.run( 

249 ["git", "diff", "--cached", "--name-only"], 

250 capture_output=True, 

251 text=True, 

252 cwd=directory, 

253 check=True, 

254 ) 

255 

256 return ( 

257 staged_result.stdout.strip().split("\n") 

258 if staged_result.stdout.strip() 

259 else [] 

260 ) 

261 except subprocess.CalledProcessError: 

262 return [] 

263 

264 

265def create_commit(directory: Path, message: str) -> tuple[bool, str]: 

266 """Create a git commit with the given message. 

267 

268 Returns: 

269 tuple: (success, commit_hash or error_message) 

270 

271 """ 

272 if not is_git_repository(directory): 

273 return False, "Not a git repository" 

274 

275 try: 

276 subprocess.run( 

277 ["git", "commit", "-m", message], 

278 capture_output=True, 

279 text=True, 

280 cwd=directory, 

281 check=True, 

282 ) 

283 

284 # Get commit hash 

285 hash_result = subprocess.run( 

286 ["git", "rev-parse", "HEAD"], 

287 capture_output=True, 

288 text=True, 

289 cwd=directory, 

290 check=True, 

291 ) 

292 

293 commit_hash = hash_result.stdout.strip()[:8] 

294 return True, commit_hash 

295 

296 except subprocess.CalledProcessError as e: 

297 return False, e.stderr.strip() if e.stderr else str(e) 

298 

299 

300def _add_worktree_context_output( 

301 worktree_info: WorktreeInfo | None, 

302 output: list[str], 

303) -> None: 

304 """Add worktree context information to output.""" 

305 if worktree_info: 

306 if worktree_info.is_main_worktree: 

307 output.append(f"📝 Main repository on branch '{worktree_info.branch}'") 

308 else: 

309 output.append( 

310 f"🌿 Worktree on branch '{worktree_info.branch}' at {worktree_info.path}", 

311 ) 

312 

313 

314def _create_checkpoint_message( 

315 project: str, 

316 quality_score: int, 

317 worktree_info: WorktreeInfo | None, 

318) -> str: 

319 """Create the checkpoint commit message.""" 

320 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") 

321 

322 # Enhanced commit message with worktree info 

323 worktree_suffix = "" 

324 if worktree_info and not worktree_info.is_main_worktree: 

325 worktree_suffix = f" [worktree: {worktree_info.branch}]" 

326 

327 commit_message = ( 

328 f"checkpoint: Session checkpoint - {timestamp}{worktree_suffix}\n\n" 

329 f"Automatic checkpoint commit via session-management MCP server\n" 

330 f"Project: {project}\n" 

331 f"Quality Score: {quality_score}/100" 

332 ) 

333 

334 if worktree_info: 

335 commit_message += f"\nBranch: {worktree_info.branch}" 

336 if not worktree_info.is_main_worktree: 

337 commit_message += f"\nWorktree: {worktree_info.path}" 

338 

339 return commit_message 

340 

341 

342def _handle_staging_and_commit( 

343 directory: Path, 

344 modified_files: list[str], 

345 project: str, 

346 quality_score: int, 

347 worktree_info: WorktreeInfo | None, 

348 output: list[str], 

349) -> tuple[bool, str]: 

350 """Handle staging and committing of modified files.""" 

351 if not stage_files(directory, modified_files): 

352 output.append("⚠️ Failed to stage files") 

353 return False, "Failed to stage files" 

354 

355 staged_files = get_staged_files(directory) 

356 if not staged_files: 

357 output.append("ℹ️ No staged changes to commit") 

358 return False, "No staged changes" 

359 

360 commit_message = _create_checkpoint_message(project, quality_score, worktree_info) 

361 success, result = create_commit(directory, commit_message) 

362 

363 if success: 

364 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") 

365 output.append(f"✅ Checkpoint commit created: {result}") 

366 output.append(f" Message: checkpoint: Session checkpoint - {timestamp}") 

367 output.append(" 💡 Use 'git reset HEAD~1' to undo if needed") 

368 return True, result 

369 output.append(f"⚠️ Commit failed: {result}") 

370 return False, result 

371 

372 

373def create_checkpoint_commit( 

374 directory: Path, 

375 project: str, 

376 quality_score: int, 

377) -> tuple[bool, str, list[str]]: 

378 """Create an automatic checkpoint commit. 

379 

380 Returns: 

381 tuple: (success, commit_hash_or_error, output_messages) 

382 

383 """ 

384 output = [] 

385 

386 if not is_git_repository(directory): 

387 output.append("ℹ️ Not a git repository - skipping commit") 

388 return False, "Not a git repository", output 

389 

390 try: 

391 # Get worktree info for enhanced commit messages 

392 worktree_info = get_worktree_info(directory) 

393 modified_files, untracked_files = get_git_status(directory) 

394 

395 if not modified_files and not untracked_files: 

396 output.append("✅ Working directory is clean - no changes to commit") 

397 return True, "clean", output 

398 

399 # Add worktree context to output 

400 _add_worktree_context_output(worktree_info, output) 

401 output.append( 

402 f"📝 Found {len(modified_files)} modified files and {len(untracked_files)} untracked files", 

403 ) 

404 

405 # Handle untracked files 

406 if untracked_files: 

407 output.extend(_format_untracked_files(untracked_files)) 

408 

409 # Stage and commit modified files 

410 if modified_files: 

411 success, result = _handle_staging_and_commit( 

412 directory, 

413 modified_files, 

414 project, 

415 quality_score, 

416 worktree_info, 

417 output, 

418 ) 

419 return success, result, output 

420 if untracked_files: 

421 output.append("ℹ️ No staged changes to commit") 

422 output.append( 

423 " 💡 Add untracked files with 'git add' if you want to include them", 

424 ) 

425 return False, "No staged changes", output 

426 

427 except Exception as e: 

428 error_msg = f"Git operations error: {e}" 

429 output.append(f"⚠️ {error_msg}") 

430 return False, error_msg, output 

431 

432 return False, "Unexpected error", output 

433 

434 

435def _format_untracked_files(untracked_files: list[str]) -> list[str]: 

436 """Format untracked files display.""" 

437 output = [] 

438 output.append("🆕 Untracked files found:") 

439 

440 for file in untracked_files[:10]: # Limit to first 10 for display 

441 output.append(f"{file}") 

442 

443 if len(untracked_files) > 10: 

444 output.append(f" ... and {len(untracked_files) - 10} more") 

445 

446 output.append("⚠️ Please manually review and add untracked files if needed:") 

447 output.append(" Use: git add <file> for files you want to include") 

448 

449 return output