Coverage for session_mgmt_mcp/worktree_manager.py: 0.00%

121 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Git Worktree Management for Session Management MCP Server. 

3 

4Provides high-level worktree operations and coordination with session management. 

5""" 

6 

7import subprocess 

8from pathlib import Path 

9from typing import Any 

10 

11from .utils.git_operations import ( 

12 WorktreeInfo, 

13 get_worktree_info, 

14 is_git_repository, 

15 list_worktrees, 

16) 

17 

18 

19class WorktreeManager: 

20 """Manages git worktrees with session coordination.""" 

21 

22 def __init__(self, session_logger=None) -> None: 

23 self.session_logger = session_logger 

24 

25 def _log(self, message: str, level: str = "info", **context) -> None: 

26 """Log messages if logger available.""" 

27 if self.session_logger: 

28 getattr(self.session_logger, level)(message, **context) 

29 

30 async def list_worktrees(self, directory: Path) -> dict[str, Any]: 

31 """List all worktrees with enhanced information.""" 

32 if not is_git_repository(directory): 

33 return {"success": False, "error": "Not a git repository", "worktrees": []} 

34 

35 try: 

36 worktrees = list_worktrees(directory) 

37 current_worktree = get_worktree_info(directory) 

38 

39 worktree_data = [] 

40 for wt in worktrees: 

41 wt_data = { 

42 "path": str(wt.path), 

43 "branch": wt.branch, 

44 "is_main": wt.is_main_worktree, 

45 "is_current": current_worktree and wt.path == current_worktree.path, 

46 "is_detached": wt.is_detached, 

47 "is_bare": wt.is_bare, 

48 "locked": wt.locked, 

49 "prunable": wt.prunable, 

50 "exists": wt.path.exists() 

51 if isinstance(wt.path, Path) 

52 else Path(wt.path).exists(), 

53 } 

54 

55 # Add session info if available 

56 wt_data["has_session"] = self._check_session_exists(wt.path) 

57 

58 worktree_data.append(wt_data) 

59 

60 self._log("Listed worktrees", worktrees_count=len(worktree_data)) 

61 

62 return { 

63 "success": True, 

64 "worktrees": worktree_data, 

65 "current_worktree": str(current_worktree.path) 

66 if current_worktree 

67 else None, 

68 "total_count": len(worktree_data), 

69 } 

70 

71 except Exception as e: 

72 self._log(f"Failed to list worktrees: {e}", level="error") 

73 return {"success": False, "error": str(e), "worktrees": []} 

74 

75 async def create_worktree( 

76 self, 

77 repository_path: Path, 

78 new_path: Path, 

79 branch: str, 

80 create_branch: bool = False, 

81 checkout_existing: bool = False, 

82 ) -> dict[str, Any]: 

83 """Create a new worktree.""" 

84 if not is_git_repository(repository_path): 

85 return { 

86 "success": False, 

87 "error": "Source directory is not a git repository", 

88 } 

89 

90 if new_path.exists(): 

91 return { 

92 "success": False, 

93 "error": f"Target path already exists: {new_path}", 

94 } 

95 

96 try: 

97 # Build git worktree add command 

98 cmd = ["git", "worktree", "add"] 

99 

100 if create_branch: 

101 cmd.extend(["-b", branch]) 

102 elif checkout_existing: 

103 cmd.extend(["--track", "-B", branch]) 

104 

105 cmd.extend([str(new_path), branch]) 

106 

107 # Execute git worktree add 

108 result = subprocess.run( 

109 cmd, 

110 cwd=repository_path, 

111 capture_output=True, 

112 text=True, 

113 check=True, 

114 ) 

115 

116 # Verify worktree was created 

117 worktree_info = get_worktree_info(new_path) 

118 if not worktree_info: 

119 return { 

120 "success": False, 

121 "error": "Worktree was created but cannot be accessed", 

122 } 

123 

124 self._log("Created worktree", path=str(new_path), branch=branch) 

125 

126 return { 

127 "success": True, 

128 "worktree_path": str(new_path), 

129 "branch": branch, 

130 "worktree_info": { 

131 "path": str(worktree_info.path), 

132 "branch": worktree_info.branch, 

133 "is_main": worktree_info.is_main_worktree, 

134 "is_detached": worktree_info.is_detached, 

135 }, 

136 "output": result.stdout.strip(), 

137 } 

138 

139 except subprocess.CalledProcessError as e: 

140 error_msg = e.stderr.strip() if e.stderr else str(e) 

141 self._log(f"Failed to create worktree: {error_msg}", level="error") 

142 return {"success": False, "error": error_msg} 

143 except Exception as e: 

144 self._log(f"Unexpected error creating worktree: {e}", level="error") 

145 return {"success": False, "error": str(e)} 

146 

147 async def remove_worktree( 

148 self, 

149 repository_path: Path, 

150 worktree_path: Path, 

151 force: bool = False, 

152 ) -> dict[str, Any]: 

153 """Remove an existing worktree.""" 

154 if not is_git_repository(repository_path): 

155 return { 

156 "success": False, 

157 "error": "Source directory is not a git repository", 

158 } 

159 

160 try: 

161 # Build git worktree remove command 

162 cmd = ["git", "worktree", "remove"] 

163 

164 if force: 

165 cmd.append("--force") 

166 

167 cmd.append(str(worktree_path)) 

168 

169 # Execute git worktree remove 

170 result = subprocess.run( 

171 cmd, 

172 cwd=repository_path, 

173 capture_output=True, 

174 text=True, 

175 check=True, 

176 ) 

177 

178 self._log("Removed worktree", path=str(worktree_path)) 

179 

180 return { 

181 "success": True, 

182 "removed_path": str(worktree_path), 

183 "output": result.stdout.strip() 

184 if result.stdout.strip() 

185 else "Worktree removed successfully", 

186 } 

187 

188 except subprocess.CalledProcessError as e: 

189 error_msg = e.stderr.strip() if e.stderr else str(e) 

190 self._log(f"Failed to remove worktree: {error_msg}", level="error") 

191 return {"success": False, "error": error_msg} 

192 except Exception as e: 

193 self._log(f"Unexpected error removing worktree: {e}", level="error") 

194 return {"success": False, "error": str(e)} 

195 

196 async def prune_worktrees(self, repository_path: Path) -> dict[str, Any]: 

197 """Prune stale worktree references.""" 

198 if not is_git_repository(repository_path): 

199 return {"success": False, "error": "Directory is not a git repository"} 

200 

201 try: 

202 # Execute git worktree prune 

203 result = subprocess.run( 

204 ["git", "worktree", "prune", "--verbose"], 

205 cwd=repository_path, 

206 capture_output=True, 

207 text=True, 

208 check=True, 

209 ) 

210 

211 output_lines = ( 

212 result.stdout.strip().split("\n") if result.stdout.strip() else [] 

213 ) 

214 pruned_count = len([line for line in output_lines if "Removing" in line]) 

215 

216 self._log("Pruned worktrees", pruned_count=pruned_count) 

217 

218 return { 

219 "success": True, 

220 "pruned_count": pruned_count, 

221 "output": result.stdout.strip() 

222 if result.stdout.strip() 

223 else "No worktrees to prune", 

224 } 

225 

226 except subprocess.CalledProcessError as e: 

227 error_msg = e.stderr.strip() if e.stderr else str(e) 

228 self._log(f"Failed to prune worktrees: {error_msg}", level="error") 

229 return {"success": False, "error": error_msg} 

230 

231 async def get_worktree_status(self, directory: Path) -> dict[str, Any]: 

232 """Get comprehensive status for current worktree and all related worktrees.""" 

233 if not is_git_repository(directory): 

234 return {"success": False, "error": "Not a git repository"} 

235 

236 try: 

237 current_worktree = get_worktree_info(directory) 

238 all_worktrees = list_worktrees(directory) 

239 

240 if not current_worktree: 

241 return { 

242 "success": False, 

243 "error": "Could not determine current worktree info", 

244 } 

245 

246 # Enhanced status with session coordination 

247 return { 

248 "success": True, 

249 "current_worktree": { 

250 "path": str(current_worktree.path), 

251 "branch": current_worktree.branch, 

252 "is_main": current_worktree.is_main_worktree, 

253 "is_detached": current_worktree.is_detached, 

254 "has_session": self._check_session_exists(current_worktree.path), 

255 }, 

256 "all_worktrees": [ 

257 { 

258 "path": str(wt.path), 

259 "branch": wt.branch, 

260 "is_main": wt.is_main_worktree, 

261 "is_current": wt.path == current_worktree.path, 

262 "exists": wt.path.exists() 

263 if isinstance(wt.path, Path) 

264 else Path(wt.path).exists(), 

265 "has_session": self._check_session_exists(wt.path), 

266 "prunable": wt.prunable, 

267 } 

268 for wt in all_worktrees 

269 ], 

270 "total_worktrees": len(all_worktrees), 

271 "session_summary": self._get_session_summary(all_worktrees), 

272 } 

273 

274 except Exception as e: 

275 self._log(f"Failed to get worktree status: {e}", level="error") 

276 return {"success": False, "error": str(e)} 

277 

278 def _check_session_exists(self, path: Path) -> bool: 

279 """Check if a worktree has an active session (placeholder).""" 

280 # This would integrate with session management 

281 # For now, just check if it's a valid path 

282 if isinstance(path, str): 

283 path = Path(path) 

284 return path.exists() and (path / ".git").exists() 

285 

286 def _get_session_summary(self, worktrees: list[WorktreeInfo]) -> dict[str, Any]: 

287 """Get summary of sessions across worktrees.""" 

288 active_sessions = 0 

289 branches = set() 

290 

291 for wt in worktrees: 

292 if self._check_session_exists(wt.path): 

293 active_sessions += 1 

294 branches.add(wt.branch) 

295 

296 return { 

297 "active_sessions": active_sessions, 

298 "unique_branches": len(branches), 

299 "branches": list(branches), 

300 } 

301 

302 async def switch_worktree_context( 

303 self, 

304 from_path: Path, 

305 to_path: Path, 

306 ) -> dict[str, Any]: 

307 """Coordinate switching between worktrees with session preservation.""" 

308 try: 

309 # Validate both paths 

310 if not is_git_repository(from_path): 

311 return { 

312 "success": False, 

313 "error": f"Source path is not a git repository: {from_path}", 

314 } 

315 

316 if not is_git_repository(to_path): 

317 return { 

318 "success": False, 

319 "error": f"Target path is not a git repository: {to_path}", 

320 } 

321 

322 from_worktree = get_worktree_info(from_path) 

323 to_worktree = get_worktree_info(to_path) 

324 

325 if not from_worktree or not to_worktree: 

326 return { 

327 "success": False, 

328 "error": "Could not get worktree information for context switch", 

329 } 

330 

331 # This would integrate with session management to: 

332 # 1. Save current session state 

333 # 2. Switch working directory context 

334 # 3. Restore/create session for target worktree 

335 

336 self._log( 

337 "Context switch completed", 

338 from_branch=from_worktree.branch, 

339 to_branch=to_worktree.branch, 

340 ) 

341 

342 return { 

343 "success": True, 

344 "from_worktree": { 

345 "path": str(from_worktree.path), 

346 "branch": from_worktree.branch, 

347 }, 

348 "to_worktree": { 

349 "path": str(to_worktree.path), 

350 "branch": to_worktree.branch, 

351 }, 

352 "context_preserved": True, 

353 "message": f"Switched from {from_worktree.branch} to {to_worktree.branch}", 

354 } 

355 

356 except Exception as e: 

357 self._log(f"Failed to switch worktree context: {e}", level="error") 

358 return {"success": False, "error": str(e)}