Coverage for session_mgmt_mcp/worktree_manager.py: 0.00%
121 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Git Worktree Management for Session Management MCP Server.
4Provides high-level worktree operations and coordination with session management.
5"""
7import subprocess
8from pathlib import Path
9from typing import Any
11from .utils.git_operations import (
12 WorktreeInfo,
13 get_worktree_info,
14 is_git_repository,
15 list_worktrees,
16)
19class WorktreeManager:
20 """Manages git worktrees with session coordination."""
22 def __init__(self, session_logger=None) -> None:
23 self.session_logger = session_logger
25 def _log(self, message: str, level: str = "info", **context) -> None:
26 """Log messages if logger available."""
27 if self.session_logger:
28 getattr(self.session_logger, level)(message, **context)
30 async def list_worktrees(self, directory: Path) -> dict[str, Any]:
31 """List all worktrees with enhanced information."""
32 if not is_git_repository(directory):
33 return {"success": False, "error": "Not a git repository", "worktrees": []}
35 try:
36 worktrees = list_worktrees(directory)
37 current_worktree = get_worktree_info(directory)
39 worktree_data = []
40 for wt in worktrees:
41 wt_data = {
42 "path": str(wt.path),
43 "branch": wt.branch,
44 "is_main": wt.is_main_worktree,
45 "is_current": current_worktree and wt.path == current_worktree.path,
46 "is_detached": wt.is_detached,
47 "is_bare": wt.is_bare,
48 "locked": wt.locked,
49 "prunable": wt.prunable,
50 "exists": wt.path.exists()
51 if isinstance(wt.path, Path)
52 else Path(wt.path).exists(),
53 }
55 # Add session info if available
56 wt_data["has_session"] = self._check_session_exists(wt.path)
58 worktree_data.append(wt_data)
60 self._log("Listed worktrees", worktrees_count=len(worktree_data))
62 return {
63 "success": True,
64 "worktrees": worktree_data,
65 "current_worktree": str(current_worktree.path)
66 if current_worktree
67 else None,
68 "total_count": len(worktree_data),
69 }
71 except Exception as e:
72 self._log(f"Failed to list worktrees: {e}", level="error")
73 return {"success": False, "error": str(e), "worktrees": []}
75 async def create_worktree(
76 self,
77 repository_path: Path,
78 new_path: Path,
79 branch: str,
80 create_branch: bool = False,
81 checkout_existing: bool = False,
82 ) -> dict[str, Any]:
83 """Create a new worktree."""
84 if not is_git_repository(repository_path):
85 return {
86 "success": False,
87 "error": "Source directory is not a git repository",
88 }
90 if new_path.exists():
91 return {
92 "success": False,
93 "error": f"Target path already exists: {new_path}",
94 }
96 try:
97 # Build git worktree add command
98 cmd = ["git", "worktree", "add"]
100 if create_branch:
101 cmd.extend(["-b", branch])
102 elif checkout_existing:
103 cmd.extend(["--track", "-B", branch])
105 cmd.extend([str(new_path), branch])
107 # Execute git worktree add
108 result = subprocess.run(
109 cmd,
110 cwd=repository_path,
111 capture_output=True,
112 text=True,
113 check=True,
114 )
116 # Verify worktree was created
117 worktree_info = get_worktree_info(new_path)
118 if not worktree_info:
119 return {
120 "success": False,
121 "error": "Worktree was created but cannot be accessed",
122 }
124 self._log("Created worktree", path=str(new_path), branch=branch)
126 return {
127 "success": True,
128 "worktree_path": str(new_path),
129 "branch": branch,
130 "worktree_info": {
131 "path": str(worktree_info.path),
132 "branch": worktree_info.branch,
133 "is_main": worktree_info.is_main_worktree,
134 "is_detached": worktree_info.is_detached,
135 },
136 "output": result.stdout.strip(),
137 }
139 except subprocess.CalledProcessError as e:
140 error_msg = e.stderr.strip() if e.stderr else str(e)
141 self._log(f"Failed to create worktree: {error_msg}", level="error")
142 return {"success": False, "error": error_msg}
143 except Exception as e:
144 self._log(f"Unexpected error creating worktree: {e}", level="error")
145 return {"success": False, "error": str(e)}
147 async def remove_worktree(
148 self,
149 repository_path: Path,
150 worktree_path: Path,
151 force: bool = False,
152 ) -> dict[str, Any]:
153 """Remove an existing worktree."""
154 if not is_git_repository(repository_path):
155 return {
156 "success": False,
157 "error": "Source directory is not a git repository",
158 }
160 try:
161 # Build git worktree remove command
162 cmd = ["git", "worktree", "remove"]
164 if force:
165 cmd.append("--force")
167 cmd.append(str(worktree_path))
169 # Execute git worktree remove
170 result = subprocess.run(
171 cmd,
172 cwd=repository_path,
173 capture_output=True,
174 text=True,
175 check=True,
176 )
178 self._log("Removed worktree", path=str(worktree_path))
180 return {
181 "success": True,
182 "removed_path": str(worktree_path),
183 "output": result.stdout.strip()
184 if result.stdout.strip()
185 else "Worktree removed successfully",
186 }
188 except subprocess.CalledProcessError as e:
189 error_msg = e.stderr.strip() if e.stderr else str(e)
190 self._log(f"Failed to remove worktree: {error_msg}", level="error")
191 return {"success": False, "error": error_msg}
192 except Exception as e:
193 self._log(f"Unexpected error removing worktree: {e}", level="error")
194 return {"success": False, "error": str(e)}
196 async def prune_worktrees(self, repository_path: Path) -> dict[str, Any]:
197 """Prune stale worktree references."""
198 if not is_git_repository(repository_path):
199 return {"success": False, "error": "Directory is not a git repository"}
201 try:
202 # Execute git worktree prune
203 result = subprocess.run(
204 ["git", "worktree", "prune", "--verbose"],
205 cwd=repository_path,
206 capture_output=True,
207 text=True,
208 check=True,
209 )
211 output_lines = (
212 result.stdout.strip().split("\n") if result.stdout.strip() else []
213 )
214 pruned_count = len([line for line in output_lines if "Removing" in line])
216 self._log("Pruned worktrees", pruned_count=pruned_count)
218 return {
219 "success": True,
220 "pruned_count": pruned_count,
221 "output": result.stdout.strip()
222 if result.stdout.strip()
223 else "No worktrees to prune",
224 }
226 except subprocess.CalledProcessError as e:
227 error_msg = e.stderr.strip() if e.stderr else str(e)
228 self._log(f"Failed to prune worktrees: {error_msg}", level="error")
229 return {"success": False, "error": error_msg}
231 async def get_worktree_status(self, directory: Path) -> dict[str, Any]:
232 """Get comprehensive status for current worktree and all related worktrees."""
233 if not is_git_repository(directory):
234 return {"success": False, "error": "Not a git repository"}
236 try:
237 current_worktree = get_worktree_info(directory)
238 all_worktrees = list_worktrees(directory)
240 if not current_worktree:
241 return {
242 "success": False,
243 "error": "Could not determine current worktree info",
244 }
246 # Enhanced status with session coordination
247 return {
248 "success": True,
249 "current_worktree": {
250 "path": str(current_worktree.path),
251 "branch": current_worktree.branch,
252 "is_main": current_worktree.is_main_worktree,
253 "is_detached": current_worktree.is_detached,
254 "has_session": self._check_session_exists(current_worktree.path),
255 },
256 "all_worktrees": [
257 {
258 "path": str(wt.path),
259 "branch": wt.branch,
260 "is_main": wt.is_main_worktree,
261 "is_current": wt.path == current_worktree.path,
262 "exists": wt.path.exists()
263 if isinstance(wt.path, Path)
264 else Path(wt.path).exists(),
265 "has_session": self._check_session_exists(wt.path),
266 "prunable": wt.prunable,
267 }
268 for wt in all_worktrees
269 ],
270 "total_worktrees": len(all_worktrees),
271 "session_summary": self._get_session_summary(all_worktrees),
272 }
274 except Exception as e:
275 self._log(f"Failed to get worktree status: {e}", level="error")
276 return {"success": False, "error": str(e)}
278 def _check_session_exists(self, path: Path) -> bool:
279 """Check if a worktree has an active session (placeholder)."""
280 # This would integrate with session management
281 # For now, just check if it's a valid path
282 if isinstance(path, str):
283 path = Path(path)
284 return path.exists() and (path / ".git").exists()
286 def _get_session_summary(self, worktrees: list[WorktreeInfo]) -> dict[str, Any]:
287 """Get summary of sessions across worktrees."""
288 active_sessions = 0
289 branches = set()
291 for wt in worktrees:
292 if self._check_session_exists(wt.path):
293 active_sessions += 1
294 branches.add(wt.branch)
296 return {
297 "active_sessions": active_sessions,
298 "unique_branches": len(branches),
299 "branches": list(branches),
300 }
302 async def switch_worktree_context(
303 self,
304 from_path: Path,
305 to_path: Path,
306 ) -> dict[str, Any]:
307 """Coordinate switching between worktrees with session preservation."""
308 try:
309 # Validate both paths
310 if not is_git_repository(from_path):
311 return {
312 "success": False,
313 "error": f"Source path is not a git repository: {from_path}",
314 }
316 if not is_git_repository(to_path):
317 return {
318 "success": False,
319 "error": f"Target path is not a git repository: {to_path}",
320 }
322 from_worktree = get_worktree_info(from_path)
323 to_worktree = get_worktree_info(to_path)
325 if not from_worktree or not to_worktree:
326 return {
327 "success": False,
328 "error": "Could not get worktree information for context switch",
329 }
331 # This would integrate with session management to:
332 # 1. Save current session state
333 # 2. Switch working directory context
334 # 3. Restore/create session for target worktree
336 self._log(
337 "Context switch completed",
338 from_branch=from_worktree.branch,
339 to_branch=to_worktree.branch,
340 )
342 return {
343 "success": True,
344 "from_worktree": {
345 "path": str(from_worktree.path),
346 "branch": from_worktree.branch,
347 },
348 "to_worktree": {
349 "path": str(to_worktree.path),
350 "branch": to_worktree.branch,
351 },
352 "context_preserved": True,
353 "message": f"Switched from {from_worktree.branch} to {to_worktree.branch}",
354 }
356 except Exception as e:
357 self._log(f"Failed to switch worktree context: {e}", level="error")
358 return {"success": False, "error": str(e)}