Coverage for session_mgmt_mcp/utils/git_operations.py: 10.45%
205 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Git operations utilities for session management."""
4import subprocess
5from dataclasses import dataclass
6from datetime import datetime
7from pathlib import Path
10@dataclass
11class WorktreeInfo:
12 """Information about a git worktree."""
14 path: Path
15 branch: str
16 is_bare: bool = False
17 is_detached: bool = False
18 is_main_worktree: bool = False
19 locked: bool = False
20 prunable: bool = False
23def is_git_repository(directory) -> bool:
24 """Check if the given directory is a git repository or worktree."""
25 if isinstance(directory, str):
26 directory = Path(directory)
27 git_dir = directory / ".git"
28 # Check for both main repo (.git directory) and worktree (.git file)
29 return git_dir.exists() and (git_dir.is_dir() or git_dir.is_file())
32def is_git_worktree(directory: Path) -> bool:
33 """Check if the directory is a git worktree (not the main repository)."""
34 if isinstance(directory, str):
35 directory = Path(directory)
36 git_path = directory / ".git"
37 # Worktrees have a .git file that points to the actual git directory
38 return git_path.exists() and git_path.is_file()
41def get_git_root(directory: Path) -> Path | None:
42 """Get the root directory of the git repository."""
43 if not is_git_repository(directory):
44 return None
46 try:
47 result = subprocess.run(
48 ["git", "rev-parse", "--show-toplevel"],
49 capture_output=True,
50 text=True,
51 cwd=directory,
52 check=True,
53 )
54 return Path(result.stdout.strip())
55 except subprocess.CalledProcessError:
56 return None
59def get_worktree_info(directory: Path) -> WorktreeInfo | None:
60 """Get information about the current worktree."""
61 if not is_git_repository(directory):
62 return None
64 try:
65 # Get current branch
66 branch_result = subprocess.run(
67 ["git", "branch", "--show-current"],
68 capture_output=True,
69 text=True,
70 cwd=directory,
71 check=True,
72 )
73 branch = branch_result.stdout.strip()
75 # Check if detached HEAD
76 is_detached = False
77 if not branch:
78 head_result = subprocess.run(
79 ["git", "rev-parse", "--short", "HEAD"],
80 capture_output=True,
81 text=True,
82 cwd=directory,
83 check=True,
84 )
85 branch = f"HEAD ({head_result.stdout.strip()})"
86 is_detached = True
88 # Get worktree path (normalized)
89 toplevel_result = subprocess.run(
90 ["git", "rev-parse", "--show-toplevel"],
91 capture_output=True,
92 text=True,
93 cwd=directory,
94 check=True,
95 )
96 path = Path(toplevel_result.stdout.strip())
98 return WorktreeInfo(
99 path=path,
100 branch=branch,
101 is_detached=is_detached,
102 is_main_worktree=not is_git_worktree(directory),
103 )
105 except subprocess.CalledProcessError:
106 return None
109def _process_worktree_line(line: str, current_worktree: dict) -> None:
110 """Process a single line from git worktree list --porcelain output."""
111 if line.startswith("worktree "):
112 current_worktree["path"] = line[9:] # Remove 'worktree ' prefix
113 elif line.startswith("HEAD "):
114 current_worktree["head"] = line[5:] # Remove 'HEAD ' prefix
115 elif line.startswith("branch "):
116 current_worktree["branch"] = line[7:] # Remove 'branch ' prefix
117 elif line == "bare":
118 current_worktree["bare"] = True
119 elif line == "detached":
120 current_worktree["detached"] = True
121 elif line.startswith("locked"):
122 current_worktree["locked"] = True
123 elif line == "prunable":
124 current_worktree["prunable"] = True
127def list_worktrees(directory: Path) -> list[WorktreeInfo]:
128 """List all worktrees for the repository."""
129 if not is_git_repository(directory):
130 return []
132 try:
133 result = subprocess.run(
134 ["git", "worktree", "list", "--porcelain"],
135 capture_output=True,
136 text=True,
137 cwd=directory,
138 check=True,
139 )
141 worktrees = []
142 current_worktree = {}
144 for line in result.stdout.strip().split("\n"):
145 if not line:
146 if current_worktree:
147 worktrees.append(_parse_worktree_entry(current_worktree))
148 current_worktree = {}
149 continue
151 _process_worktree_line(line, current_worktree)
153 # Handle last worktree if exists
154 if current_worktree:
155 worktrees.append(_parse_worktree_entry(current_worktree))
157 return worktrees
159 except subprocess.CalledProcessError:
160 return []
163def _parse_worktree_entry(entry: dict) -> WorktreeInfo:
164 """Parse a single worktree entry from git worktree list output."""
165 path = Path(entry.get("path", ""))
166 branch = entry.get("branch", entry.get("head", "unknown"))
168 # Check if this is the main worktree (bare repos don't have .git file)
169 is_main = not (path / ".git").is_file() if path.exists() else False
171 return WorktreeInfo(
172 path=path,
173 branch=branch,
174 is_bare=entry.get("bare", False),
175 is_detached=entry.get("detached", False),
176 is_main_worktree=is_main,
177 locked=entry.get("locked", False),
178 prunable=entry.get("prunable", False),
179 )
182def get_git_status(directory: Path) -> tuple[list[str], list[str]]:
183 """Get modified and untracked files from git status."""
184 if not is_git_repository(directory):
185 return [], []
187 try:
188 status_result = subprocess.run(
189 ["git", "status", "--porcelain"],
190 capture_output=True,
191 text=True,
192 cwd=directory,
193 check=True,
194 )
196 status_lines = (
197 status_result.stdout.strip().split("\n")
198 if status_result.stdout.strip()
199 else []
200 )
202 return _parse_git_status(status_lines)
203 except subprocess.CalledProcessError:
204 return [], []
207def _parse_git_status(status_lines: list[str]) -> tuple[list[str], list[str]]:
208 """Parse git status output into modified and untracked files."""
209 modified_files = []
210 untracked_files = []
212 for line in status_lines:
213 if line:
214 status = line[:2]
215 filepath = line[3:]
216 if status == "??":
217 untracked_files.append(filepath)
218 elif status.strip():
219 modified_files.append(filepath)
221 return modified_files, untracked_files
224def stage_files(directory: Path, files: list[str]) -> bool:
225 """Stage files for commit."""
226 if not is_git_repository(directory) or not files:
227 return False
229 try:
230 for file in files:
231 subprocess.run(
232 ["git", "add", file],
233 cwd=directory,
234 capture_output=True,
235 check=True,
236 )
237 return True
238 except subprocess.CalledProcessError:
239 return False
242def get_staged_files(directory: Path) -> list[str]:
243 """Get list of staged files."""
244 if not is_git_repository(directory):
245 return []
247 try:
248 staged_result = subprocess.run(
249 ["git", "diff", "--cached", "--name-only"],
250 capture_output=True,
251 text=True,
252 cwd=directory,
253 check=True,
254 )
256 return (
257 staged_result.stdout.strip().split("\n")
258 if staged_result.stdout.strip()
259 else []
260 )
261 except subprocess.CalledProcessError:
262 return []
265def create_commit(directory: Path, message: str) -> tuple[bool, str]:
266 """Create a git commit with the given message.
268 Returns:
269 tuple: (success, commit_hash or error_message)
271 """
272 if not is_git_repository(directory):
273 return False, "Not a git repository"
275 try:
276 subprocess.run(
277 ["git", "commit", "-m", message],
278 capture_output=True,
279 text=True,
280 cwd=directory,
281 check=True,
282 )
284 # Get commit hash
285 hash_result = subprocess.run(
286 ["git", "rev-parse", "HEAD"],
287 capture_output=True,
288 text=True,
289 cwd=directory,
290 check=True,
291 )
293 commit_hash = hash_result.stdout.strip()[:8]
294 return True, commit_hash
296 except subprocess.CalledProcessError as e:
297 return False, e.stderr.strip() if e.stderr else str(e)
300def _add_worktree_context_output(
301 worktree_info: WorktreeInfo | None,
302 output: list[str],
303) -> None:
304 """Add worktree context information to output."""
305 if worktree_info:
306 if worktree_info.is_main_worktree:
307 output.append(f"📝 Main repository on branch '{worktree_info.branch}'")
308 else:
309 output.append(
310 f"🌿 Worktree on branch '{worktree_info.branch}' at {worktree_info.path}",
311 )
314def _create_checkpoint_message(
315 project: str,
316 quality_score: int,
317 worktree_info: WorktreeInfo | None,
318) -> str:
319 """Create the checkpoint commit message."""
320 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
322 # Enhanced commit message with worktree info
323 worktree_suffix = ""
324 if worktree_info and not worktree_info.is_main_worktree:
325 worktree_suffix = f" [worktree: {worktree_info.branch}]"
327 commit_message = (
328 f"checkpoint: Session checkpoint - {timestamp}{worktree_suffix}\n\n"
329 f"Automatic checkpoint commit via session-management MCP server\n"
330 f"Project: {project}\n"
331 f"Quality Score: {quality_score}/100"
332 )
334 if worktree_info:
335 commit_message += f"\nBranch: {worktree_info.branch}"
336 if not worktree_info.is_main_worktree:
337 commit_message += f"\nWorktree: {worktree_info.path}"
339 return commit_message
342def _handle_staging_and_commit(
343 directory: Path,
344 modified_files: list[str],
345 project: str,
346 quality_score: int,
347 worktree_info: WorktreeInfo | None,
348 output: list[str],
349) -> tuple[bool, str]:
350 """Handle staging and committing of modified files."""
351 if not stage_files(directory, modified_files):
352 output.append("⚠️ Failed to stage files")
353 return False, "Failed to stage files"
355 staged_files = get_staged_files(directory)
356 if not staged_files:
357 output.append("ℹ️ No staged changes to commit")
358 return False, "No staged changes"
360 commit_message = _create_checkpoint_message(project, quality_score, worktree_info)
361 success, result = create_commit(directory, commit_message)
363 if success:
364 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
365 output.append(f"✅ Checkpoint commit created: {result}")
366 output.append(f" Message: checkpoint: Session checkpoint - {timestamp}")
367 output.append(" 💡 Use 'git reset HEAD~1' to undo if needed")
368 return True, result
369 output.append(f"⚠️ Commit failed: {result}")
370 return False, result
373def create_checkpoint_commit(
374 directory: Path,
375 project: str,
376 quality_score: int,
377) -> tuple[bool, str, list[str]]:
378 """Create an automatic checkpoint commit.
380 Returns:
381 tuple: (success, commit_hash_or_error, output_messages)
383 """
384 output = []
386 if not is_git_repository(directory):
387 output.append("ℹ️ Not a git repository - skipping commit")
388 return False, "Not a git repository", output
390 try:
391 # Get worktree info for enhanced commit messages
392 worktree_info = get_worktree_info(directory)
393 modified_files, untracked_files = get_git_status(directory)
395 if not modified_files and not untracked_files:
396 output.append("✅ Working directory is clean - no changes to commit")
397 return True, "clean", output
399 # Add worktree context to output
400 _add_worktree_context_output(worktree_info, output)
401 output.append(
402 f"📝 Found {len(modified_files)} modified files and {len(untracked_files)} untracked files",
403 )
405 # Handle untracked files
406 if untracked_files:
407 output.extend(_format_untracked_files(untracked_files))
409 # Stage and commit modified files
410 if modified_files:
411 success, result = _handle_staging_and_commit(
412 directory,
413 modified_files,
414 project,
415 quality_score,
416 worktree_info,
417 output,
418 )
419 return success, result, output
420 if untracked_files:
421 output.append("ℹ️ No staged changes to commit")
422 output.append(
423 " 💡 Add untracked files with 'git add' if you want to include them",
424 )
425 return False, "No staged changes", output
427 except Exception as e:
428 error_msg = f"Git operations error: {e}"
429 output.append(f"⚠️ {error_msg}")
430 return False, error_msg, output
432 return False, "Unexpected error", output
435def _format_untracked_files(untracked_files: list[str]) -> list[str]:
436 """Format untracked files display."""
437 output = []
438 output.append("🆕 Untracked files found:")
440 for file in untracked_files[:10]: # Limit to first 10 for display
441 output.append(f" • {file}")
443 if len(untracked_files) > 10:
444 output.append(f" ... and {len(untracked_files) - 10} more")
446 output.append("⚠️ Please manually review and add untracked files if needed:")
447 output.append(" Use: git add <file> for files you want to include")
449 return output