Coverage for little_loops / parallel / merge_coordinator.py: 84%

424 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Merge coordinator for sequential integration of parallel worker changes. 

2 

3Handles merging completed worker branches back to main with conflict detection 

4and automatic retry capability. 

5""" 

6 

7from __future__ import annotations 

8 

9import shutil 

10import subprocess 

11import threading 

12import time 

13from pathlib import Path 

14from queue import Empty, Queue 

15from typing import TYPE_CHECKING 

16 

17from little_loops.parallel.git_lock import GitLock 

18from little_loops.parallel.types import ( 

19 MergeRequest, 

20 MergeStatus, 

21 ParallelConfig, 

22 WorkerResult, 

23) 

24 

25if TYPE_CHECKING: 

26 from little_loops.logger import Logger 

27 

28 

29class MergeCoordinator: 

30 """Sequential merge queue with conflict handling. 

31 

32 Processes merge requests one at a time to avoid conflicts. Supports 

33 automatic rebase and retry on merge failures. Handles uncommitted local 

34 changes by stashing them before merge operations. 

35 

36 Example: 

37 >>> coordinator = MergeCoordinator(config, logger, repo_path) 

38 >>> coordinator.start() 

39 >>> coordinator.queue_merge(worker_result) 

40 >>> # ... later ... 

41 >>> coordinator.shutdown() 

42 """ 

43 

44 def __init__( 

45 self, 

46 config: ParallelConfig, 

47 logger: Logger, 

48 repo_path: Path | None = None, 

49 git_lock: GitLock | None = None, 

50 ) -> None: 

51 """Initialize the merge coordinator. 

52 

53 Args: 

54 config: Parallel processing configuration 

55 logger: Logger for merge output 

56 repo_path: Path to the git repository (default: current directory) 

57 git_lock: Shared lock for git operations (created if not provided) 

58 """ 

59 self.config = config 

60 self.logger = logger 

61 self.repo_path = repo_path or Path.cwd() 

62 self._git_lock = git_lock or GitLock(logger) 

63 self._queue: Queue[MergeRequest] = Queue() 

64 self._thread: threading.Thread | None = None 

65 self._shutdown_event = threading.Event() 

66 self._merged: list[str] = [] 

67 self._failed: dict[str, str] = {} 

68 self._lock = threading.Lock() 

69 self._stash_active = False # Track if we have an active stash 

70 self._consecutive_failures = 0 # Circuit breaker counter 

71 self._paused = False # Set when circuit breaker trips 

72 self._assume_unchanged_active = False # Track if state file is marked assume-unchanged 

73 self._stash_pop_failures: dict[str, str] = {} # issue_id -> failure message 

74 self._current_issue_id: str | None = ( 

75 None # Track current issue for stash failure attribution 

76 ) 

77 self._problematic_commits: set[str] = ( 

78 set() 

79 ) # Track commits causing repeated rebase conflicts 

80 

81 def start(self) -> None: 

82 """Start the merge coordinator background thread.""" 

83 if self._thread is not None and self._thread.is_alive(): 

84 return 

85 

86 self._shutdown_event.clear() 

87 self._thread = threading.Thread( 

88 target=self._merge_loop, 

89 name="merge-coordinator", 

90 daemon=True, 

91 ) 

92 self._thread.start() 

93 self.logger.info("Merge coordinator started") 

94 

95 def shutdown(self, wait: bool = True, timeout: float = 30.0) -> None: 

96 """Shutdown the merge coordinator. 

97 

98 Args: 

99 wait: Whether to wait for pending merges to complete 

100 timeout: Maximum time to wait for shutdown 

101 """ 

102 if self._thread is None: 

103 return 

104 

105 self._shutdown_event.set() 

106 

107 if wait and self._thread.is_alive(): 

108 self._thread.join(timeout=timeout) 

109 

110 self._thread = None 

111 self.logger.info("Merge coordinator stopped") 

112 

113 def queue_merge(self, worker_result: WorkerResult) -> None: 

114 """Queue a worker result for merging. 

115 

116 Args: 

117 worker_result: Result from a completed worker 

118 """ 

119 request = MergeRequest(worker_result=worker_result) 

120 self._queue.put(request) 

121 self.logger.info( 

122 f"Queued merge for {worker_result.issue_id} (branch: {worker_result.branch_name})" 

123 ) 

124 

125 def _stash_local_changes(self) -> bool: 

126 """Stash any uncommitted tracked changes in the main repo. 

127 

128 Only stashes tracked file modifications. Untracked files are not stashed 

129 because git stash pathspec exclusions don't work reliably with -u flag. 

130 Untracked file conflicts during merge are handled by _handle_untracked_conflict. 

131 

132 The following are explicitly excluded from stashing: 

133 1. State file - managed by orchestrator and continuously updated 

134 2. Lifecycle file moves - issue files being moved to completed/ directory 

135 3. Files in completed directory - lifecycle-managed files 

136 4. Claude Code context state file - managed by Claude Code externally 

137 

138 These exclusions prevent stash pop conflicts after merge, since the merge 

139 may change HEAD and create conflicts with stashed rename operations. 

140 

141 Returns: 

142 True if changes were stashed, False if working tree was clean 

143 """ 

144 state_file_path = Path(self.config.state_file) 

145 state_file_str = str(state_file_path) 

146 state_file_name = state_file_path.name 

147 

148 # Check if there are any tracked changes to stash. 

149 # We only look at tracked files (exclude untracked with grep -v '??') 

150 # since we can only reliably stash tracked changes. 

151 status_result = self._git_lock.run( 

152 ["status", "--porcelain"], 

153 cwd=self.repo_path, 

154 timeout=30, 

155 ) 

156 

157 # Filter to only tracked changes (lines not starting with ??) 

158 # and exclude orchestrator-managed files to prevent stash pop conflicts 

159 tracked_changes = [] 

160 files_to_stash = [] 

161 # Note: Don't use .strip() on the full output - it removes leading spaces 

162 # from the first line which are significant in git status porcelain format 

163 for line in status_result.stdout.splitlines(): 

164 if not line or line.startswith("??"): 

165 continue 

166 # Extract file path from porcelain format (XY filename or XY -> filename for renames) 

167 # Format: XY filename or XY old -> new (XY is exactly 2 chars + 1 space) 

168 file_path = line[3:].split(" -> ")[-1].strip() 

169 if file_path == state_file_str or file_path.endswith(state_file_name): 

170 continue # Skip state file - orchestrator manages it independently 

171 # Skip Claude Code context state file - managed externally 

172 if file_path.endswith("ll-context-state.json"): 

173 self.logger.debug(f"Skipping Claude context state file from stash: {file_path}") 

174 continue 

175 tracked_changes.append(line) 

176 files_to_stash.append(file_path) 

177 

178 if not files_to_stash: 

179 return False # No tracked changes to stash (excluding state file) 

180 

181 # Log files to be stashed for debugging 

182 self.logger.debug(f"Tracked files to stash: {tracked_changes[:10]}") 

183 

184 # Stash only specific files, explicitly excluding the state file. 

185 # Using explicit file list avoids race conditions where the orchestrator 

186 # modifies the state file between a checkout and stash-all operation. 

187 # Note: gitignored files are never stashed anyway. 

188 stash_result = self._git_lock.run( 

189 [ 

190 "stash", 

191 "push", 

192 "-m", 

193 "ll-parallel: auto-stash before merge", 

194 "--", 

195 *files_to_stash, 

196 ], 

197 cwd=self.repo_path, 

198 timeout=30, 

199 ) 

200 

201 if stash_result.returncode == 0: 

202 self._stash_active = True 

203 self.logger.info("Stashed local changes before merge") 

204 return True 

205 

206 self.logger.error(f"Failed to stash local changes: {stash_result.stderr}") 

207 return False 

208 

209 def _pop_stash(self) -> bool: 

210 """Restore stashed changes if any were stashed. 

211 

212 Important: This method preserves the merge even if stash pop fails. 

213 We never reset --hard HEAD here because that would undo a successful merge. 

214 

215 Returns: 

216 True if stash was successfully popped or no stash was active, 

217 False if pop failed (stash is left for manual recovery). 

218 """ 

219 if not self._stash_active: 

220 return True 

221 

222 pop_result = self._git_lock.run( 

223 ["stash", "pop"], 

224 cwd=self.repo_path, 

225 timeout=30, 

226 ) 

227 

228 if pop_result.returncode != 0: 

229 self.logger.warning(f"Failed to pop stash: {pop_result.stderr.strip()}") 

230 

231 # Check if it's a conflict issue - in that case, stash pop may have 

232 # partially applied. We need to clean up the index but preserve the merge. 

233 status_result = self._git_lock.run( 

234 ["status", "--porcelain"], 

235 cwd=self.repo_path, 

236 timeout=30, 

237 ) 

238 

239 # Check for unmerged entries from the stash pop attempt 

240 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD") 

241 has_unmerged = any( 

242 line[:2] in unmerged_prefixes 

243 for line in status_result.stdout.splitlines() 

244 if len(line) >= 2 

245 ) 

246 

247 if has_unmerged: 

248 # Clean up the conflicted stash pop without affecting the merge 

249 # Use checkout to restore conflicted files to their post-merge state 

250 self._git_lock.run( 

251 ["checkout", "--theirs", "."], 

252 cwd=self.repo_path, 

253 timeout=30, 

254 ) 

255 self._git_lock.run( 

256 ["reset", "HEAD"], 

257 cwd=self.repo_path, 

258 timeout=30, 

259 ) 

260 self.logger.info("Cleaned up conflicted stash pop, merge preserved") 

261 

262 # Leave the stash intact for manual recovery 

263 self._stash_active = False 

264 

265 # Record this failure for reporting in final summary 

266 if self._current_issue_id: 

267 with self._lock: 

268 self._stash_pop_failures[self._current_issue_id] = ( 

269 "Local changes could not be restored after merge. " 

270 "Run 'git stash list' and 'git stash pop' to recover manually." 

271 ) 

272 

273 self.logger.warning( 

274 "Stash could not be restored - your changes are saved in 'git stash list'. " 

275 "Run 'git stash show' to view and 'git stash pop' to retry manually." 

276 ) 

277 return False 

278 

279 self._stash_active = False 

280 self.logger.info("Restored stashed local changes") 

281 return True 

282 

283 def _mark_state_file_assume_unchanged(self) -> bool: 

284 """Mark the state file as assume-unchanged to prevent git from seeing modifications. 

285 

286 This allows git pull --rebase to proceed even when the state file is modified, 

287 since the orchestrator continuously updates it during processing. 

288 

289 Returns: 

290 True if successfully marked, False otherwise 

291 """ 

292 state_file = str(self.config.state_file) 

293 

294 # Check if file exists and is tracked 

295 ls_files = self._git_lock.run( 

296 ["ls-files", state_file], 

297 cwd=self.repo_path, 

298 timeout=10, 

299 ) 

300 

301 if not ls_files.stdout.strip(): 

302 # File not tracked, nothing to do 

303 return True 

304 

305 result = self._git_lock.run( 

306 ["update-index", "--assume-unchanged", state_file], 

307 cwd=self.repo_path, 

308 timeout=10, 

309 ) 

310 

311 if result.returncode == 0: 

312 self._assume_unchanged_active = True 

313 self.logger.debug(f"Marked {state_file} as assume-unchanged") 

314 return True 

315 

316 self.logger.warning(f"Failed to mark state file assume-unchanged: {result.stderr}") 

317 return False 

318 

319 def _restore_state_file_tracking(self) -> bool: 

320 """Restore normal tracking for the state file. 

321 

322 Returns: 

323 True if successfully restored, False otherwise 

324 """ 

325 if not self._assume_unchanged_active: 

326 return True 

327 

328 state_file = str(self.config.state_file) 

329 

330 result = self._git_lock.run( 

331 ["update-index", "--no-assume-unchanged", state_file], 

332 cwd=self.repo_path, 

333 timeout=10, 

334 ) 

335 

336 self._assume_unchanged_active = False 

337 

338 if result.returncode != 0: 

339 self.logger.warning(f"Failed to restore state file tracking: {result.stderr}") 

340 return False 

341 

342 self.logger.debug(f"Restored tracking for {state_file}") 

343 return True 

344 

345 def _is_local_changes_error(self, error_output: str) -> bool: 

346 """Check if the error is due to uncommitted local changes. 

347 

348 Args: 

349 error_output: The stderr/stdout from the failed git command 

350 

351 Returns: 

352 True if the error indicates local changes would be overwritten 

353 """ 

354 indicators = [ 

355 "Your local changes to the following files would be overwritten", 

356 "Please commit your changes or stash them before you merge", 

357 "error: cannot pull with rebase: You have unstaged changes", 

358 ] 

359 return any(indicator in error_output for indicator in indicators) 

360 

361 def _is_untracked_files_error(self, error_output: str) -> bool: 

362 """Check if the error is due to untracked files blocking merge. 

363 

364 Args: 

365 error_output: The stderr/stdout from the failed git command 

366 

367 Returns: 

368 True if the error indicates untracked files would be overwritten 

369 """ 

370 indicators = [ 

371 "untracked working tree files would be overwritten by merge", 

372 "Please move or remove them before you merge", 

373 ] 

374 return any(indicator in error_output for indicator in indicators) 

375 

376 def _is_index_error(self, error_output: str) -> bool: 

377 """Check if the error is due to a corrupted git index. 

378 

379 Args: 

380 error_output: The stderr/stdout from the failed git command 

381 

382 Returns: 

383 True if the error indicates index problems 

384 """ 

385 indicators = [ 

386 "you need to resolve your current index first", 

387 "fatal: cannot do a partial commit during a merge", 

388 "error: you have not concluded your merge", 

389 ] 

390 return any(indicator in error_output for indicator in indicators) 

391 

392 def _is_rebase_in_progress(self) -> bool: 

393 """Check if a rebase is currently in progress. 

394 

395 Returns: 

396 True if rebase is in progress (rebase-merge or rebase-apply exists) 

397 """ 

398 rebase_merge = self.repo_path / ".git" / "rebase-merge" 

399 rebase_apply = self.repo_path / ".git" / "rebase-apply" 

400 return rebase_merge.exists() or rebase_apply.exists() 

401 

402 def _abort_rebase_if_in_progress(self) -> bool: 

403 """Abort any in-progress rebase operation. 

404 

405 Returns: 

406 True if rebase was aborted or none was in progress, 

407 False if abort failed 

408 """ 

409 if not self._is_rebase_in_progress(): 

410 return True 

411 

412 self.logger.warning("Detected rebase in progress, aborting...") 

413 abort_result = self._git_lock.run( 

414 ["rebase", "--abort"], 

415 cwd=self.repo_path, 

416 timeout=30, 

417 ) 

418 

419 if abort_result.returncode != 0: 

420 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}") 

421 # Force hard reset as last resort 

422 return self._attempt_hard_reset() 

423 

424 self.logger.info("Aborted incomplete rebase from pull") 

425 return True 

426 

427 def _is_unmerged_files_error(self, error_output: str) -> bool: 

428 """Check if the error is due to pre-existing unmerged files. 

429 

430 Args: 

431 error_output: The stderr/stdout from the failed git command 

432 

433 Returns: 

434 True if the error indicates unmerged files blocking the operation 

435 """ 

436 indicators = [ 

437 "you have unmerged files", 

438 "Merging is not possible because you have unmerged files", 

439 "fix conflicts and then commit the result", 

440 ] 

441 return any(indicator in error_output for indicator in indicators) 

442 

443 def _detect_conflict_commit(self, error_output: str) -> str | None: 

444 """Extract commit hash from rebase conflict output. 

445 

446 Looks for patterns like: 

447 - "dropping ae3b85ec1cac501058f6e5da362be37be1c99801 feat(ai): add stall detectio" 

448 

449 Args: 

450 error_output: The stderr/stdout from the failed git pull --rebase 

451 

452 Returns: 

453 The 40-character commit hash if found, None otherwise 

454 """ 

455 import re 

456 

457 # Pattern: "dropping <40-char-hash>" followed by space and message 

458 # Match only full 40-char hashes to avoid false positives 

459 match = re.search(r"dropping\s+([a-f0-9]{40})\s+", error_output, re.IGNORECASE) 

460 return match.group(1) if match else None 

461 

462 def _check_and_recover_index(self) -> bool: 

463 """Check git index health and attempt recovery if needed. 

464 

465 Returns: 

466 True if index is healthy or was recovered, False if unrecoverable 

467 """ 

468 # Check if we're in the middle of a merge 

469 merge_head = self.repo_path / ".git" / "MERGE_HEAD" 

470 if merge_head.exists(): 

471 self.logger.warning("Detected incomplete merge, aborting...") 

472 abort_result = self._git_lock.run( 

473 ["merge", "--abort"], 

474 cwd=self.repo_path, 

475 timeout=30, 

476 ) 

477 if abort_result.returncode != 0: 

478 self.logger.error(f"Failed to abort merge: {abort_result.stderr}") 

479 return False 

480 self.logger.info("Aborted incomplete merge") 

481 

482 # Check if we're in the middle of a rebase 

483 rebase_dir = self.repo_path / ".git" / "rebase-merge" 

484 rebase_apply = self.repo_path / ".git" / "rebase-apply" 

485 if rebase_dir.exists() or rebase_apply.exists(): 

486 self.logger.warning("Detected incomplete rebase, aborting...") 

487 abort_result = self._git_lock.run( 

488 ["rebase", "--abort"], 

489 cwd=self.repo_path, 

490 timeout=30, 

491 ) 

492 if abort_result.returncode != 0: 

493 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}") 

494 return False 

495 self.logger.info("Aborted incomplete rebase") 

496 # Force reset after rebase abort - the abort can leave index in dirty state 

497 # This is defensive since unmerged file detection below may not trigger 

498 if not self._attempt_hard_reset(): 

499 return False 

500 

501 # Check for unmerged files in the index (UU, AA, DD, AU, UA, DU, UD prefixes) 

502 # These can persist even after merge --abort in some edge cases 

503 status_result = self._git_lock.run( 

504 ["status", "--porcelain"], 

505 cwd=self.repo_path, 

506 timeout=30, 

507 ) 

508 

509 if status_result.returncode != 0: 

510 self.logger.error(f"git status failed: {status_result.stderr}") 

511 return self._attempt_hard_reset() 

512 

513 # Debug logging to diagnose unmerged detection issues 

514 if status_result.stdout.strip(): 

515 self.logger.debug(f"Git status output: {status_result.stdout[:500]}") 

516 

517 # Check for unmerged entries (first two chars indicate index/worktree status) 

518 # Unmerged states: UU (both modified), AA (both added), DD (both deleted), 

519 # AU/UA (added by us/them), DU/UD (deleted by us/them) 

520 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD") 

521 has_unmerged = any( 

522 line[:2] in unmerged_prefixes 

523 for line in status_result.stdout.splitlines() 

524 if len(line) >= 2 

525 ) 

526 

527 if has_unmerged: 

528 self.logger.warning("Detected unmerged files in index, resetting...") 

529 if not self._attempt_hard_reset(): 

530 return False 

531 self.logger.info("Cleared unmerged files from index") 

532 

533 # Final safety check - if MERGE_HEAD still exists, force reset 

534 # This catches edge cases where abort succeeded but state is still dirty 

535 if merge_head.exists(): 

536 self.logger.warning("MERGE_HEAD persists after recovery attempts, forcing reset") 

537 if not self._attempt_hard_reset(): 

538 return False 

539 

540 return True 

541 

542 def _attempt_hard_reset(self) -> bool: 

543 """Attempt a hard reset to recover from index issues. 

544 

545 Returns: 

546 True if reset succeeded, False otherwise 

547 """ 

548 self.logger.warning("Attempting hard reset to recover...") 

549 reset_result = self._git_lock.run( 

550 ["reset", "--hard", "HEAD"], 

551 cwd=self.repo_path, 

552 timeout=30, 

553 ) 

554 if reset_result.returncode != 0: 

555 self.logger.error("Hard reset failed - manual intervention required") 

556 return False 

557 self.logger.info("Hard reset successful") 

558 return True 

559 

560 def _merge_loop(self) -> None: 

561 """Background thread loop for processing merge requests.""" 

562 while not self._shutdown_event.is_set(): 

563 try: 

564 # Wait for next merge request with timeout 

565 try: 

566 request = self._queue.get(timeout=1.0) 

567 except Empty: 

568 continue 

569 

570 # Process the merge 

571 self._process_merge(request) 

572 

573 except Exception as e: 

574 self.logger.error(f"Merge loop error: {e}") 

575 time.sleep(1.0) 

576 

577 def _process_merge(self, request: MergeRequest) -> None: 

578 """Process a single merge request. 

579 

580 Stashes any uncommitted local changes before attempting the merge, 

581 and restores them afterward (regardless of success/failure). 

582 

583 Args: 

584 request: Merge request to process 

585 """ 

586 result = request.worker_result 

587 with self._lock: 

588 self._current_issue_id = result.issue_id 

589 self.logger.info(f"Processing merge for {result.issue_id}") 

590 had_local_changes = False 

591 

592 try: 

593 # Circuit breaker check 

594 if self._paused: 

595 self.logger.warning( 

596 f"Merge coordinator paused due to repeated failures. " 

597 f"Skipping {result.issue_id}. Manual intervention required." 

598 ) 

599 self._handle_failure(request, "Merge coordinator paused - circuit breaker tripped") 

600 return 

601 

602 request.status = MergeStatus.IN_PROGRESS 

603 

604 # Health check: ensure git index is clean before proceeding 

605 if not self._check_and_recover_index(): 

606 self._consecutive_failures += 1 

607 if self._consecutive_failures >= 3: 

608 self._paused = True 

609 self.logger.error( 

610 f"Circuit breaker tripped after {self._consecutive_failures} consecutive failures. " 

611 "Merge coordinator paused. Manual recovery required." 

612 ) 

613 self._handle_failure(request, "Git index recovery failed") 

614 return 

615 

616 # Mark state file as assume-unchanged to prevent pull --rebase conflicts 

617 # The orchestrator continuously updates the state file during processing 

618 self._mark_state_file_assume_unchanged() 

619 

620 # Stash any local changes before merge operations 

621 had_local_changes = self._stash_local_changes() 

622 

623 # Ensure we're on base branch in the main repo 

624 base = self.config.base_branch 

625 remote = self.config.remote_name 

626 checkout_result = self._git_lock.run( 

627 ["checkout", base], 

628 cwd=self.repo_path, 

629 timeout=30, 

630 ) 

631 

632 if checkout_result.returncode != 0: 

633 error_output = checkout_result.stderr + checkout_result.stdout 

634 if self._is_local_changes_error(error_output): 

635 # This shouldn't happen since we stashed, but handle it anyway 

636 self.logger.warning( 

637 "Checkout failed due to local changes despite stash attempt" 

638 ) 

639 if self._is_index_error(error_output): 

640 # Try recovery 

641 if self._check_and_recover_index(): 

642 # Retry checkout 

643 checkout_result = self._git_lock.run( 

644 ["checkout", base], 

645 cwd=self.repo_path, 

646 timeout=30, 

647 ) 

648 if checkout_result.returncode == 0: 

649 self.logger.info("Recovered from index error, checkout succeeded") 

650 else: 

651 raise RuntimeError( 

652 f"Failed to checkout {base} after recovery: " 

653 f"{checkout_result.stderr}" 

654 ) 

655 else: 

656 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}") 

657 else: 

658 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}") 

659 

660 # Track if merge strategy was used during pull (for conflict handling) 

661 used_merge_strategy = False 

662 

663 # Pull latest changes 

664 pull_result = self._git_lock.run( 

665 ["pull", "--rebase", remote, base], 

666 cwd=self.repo_path, 

667 timeout=60, 

668 ) 

669 

670 # Handle pull failures 

671 if pull_result.returncode != 0: 

672 error_output = pull_result.stderr + pull_result.stdout 

673 

674 # Check if rebase conflicted - must abort before continuing 

675 if self._is_rebase_in_progress(): 

676 conflict_commit = self._detect_conflict_commit(error_output) 

677 

678 if conflict_commit and conflict_commit in self._problematic_commits: 

679 # Known problematic commit - use merge strategy instead 

680 self.logger.info( 

681 f"Repeated rebase conflict with {conflict_commit[:8]}, " 

682 f"using merge strategy (git pull --no-rebase)" 

683 ) 

684 if not self._abort_rebase_if_in_progress(): 

685 raise RuntimeError("Failed to recover from rebase conflict during pull") 

686 

687 # Attempt merge strategy pull 

688 merge_pull_result = self._git_lock.run( 

689 ["pull", "--no-rebase", remote, base], 

690 cwd=self.repo_path, 

691 timeout=60, 

692 ) 

693 

694 if merge_pull_result.returncode != 0: 

695 self.logger.warning( 

696 f"Merge strategy pull also failed: {merge_pull_result.stderr[:200]}" 

697 ) 

698 # Continue anyway - merge may still work or fail appropriately 

699 else: 

700 self.logger.info( 

701 f"Merge strategy pull succeeded for {conflict_commit[:8]}" 

702 ) 

703 used_merge_strategy = True 

704 

705 else: 

706 # First time seeing this conflict or couldn't extract commit 

707 if conflict_commit: 

708 self._problematic_commits.add(conflict_commit) 

709 self.logger.warning( 

710 f"New rebase conflict with {conflict_commit[:8]}, " 

711 f"tracking for future merges (will use merge strategy on repeat)" 

712 ) 

713 else: 

714 self.logger.warning( 

715 "Rebase conflict detected (could not extract commit hash), " 

716 "tracking for future merges" 

717 ) 

718 

719 self.logger.warning( 

720 f"Pull --rebase failed with conflicts: {error_output[:200]}" 

721 ) 

722 

723 if not self._abort_rebase_if_in_progress(): 

724 raise RuntimeError("Failed to recover from rebase conflict during pull") 

725 # After aborting rebase, we're back to pre-pull state 

726 # Continue without the pull - merge may still work or conflict 

727 self.logger.info("Continuing without pull after rebase abort") 

728 

729 elif self._is_local_changes_error(error_output): 

730 self.logger.warning( 

731 f"Pull failed due to local changes, attempting re-stash: {error_output[:200]}" 

732 ) 

733 # Re-stash any local changes that appeared during pull 

734 if self._stash_local_changes(): 

735 self.logger.info("Re-stashed local changes after pull conflict") 

736 had_local_changes = True 

737 # For other pull failures, continue - merge will handle or fail 

738 

739 # Safety check: ensure no unmerged files before merge attempt 

740 # This catches edge cases where previous operations left dirty state 

741 if not self._check_and_recover_index(): 

742 raise RuntimeError( 

743 "Git index has unresolved conflicts before merge - recovery failed" 

744 ) 

745 

746 # Attempt merge with no-ff 

747 merge_result = self._git_lock.run( 

748 [ 

749 "merge", 

750 result.branch_name, 

751 "--no-ff", 

752 "-m", 

753 f"feat: parallel merge {result.issue_id}\n\n" 

754 f"Automated merge from parallel issue processing.", 

755 ], 

756 cwd=self.repo_path, 

757 timeout=60, 

758 ) 

759 

760 if merge_result.returncode != 0: 

761 error_output = merge_result.stderr + merge_result.stdout 

762 

763 # Check for local changes error (shouldn't happen after stash) 

764 if self._is_local_changes_error(error_output): 

765 self.logger.warning( 

766 f"Merge blocked by local changes despite stash: {error_output[:200]}" 

767 ) 

768 raise RuntimeError(f"Merge failed due to local changes: {error_output[:200]}") 

769 

770 # Check for untracked files blocking merge 

771 if self._is_untracked_files_error(error_output): 

772 self._handle_untracked_conflict(request, error_output) 

773 return 

774 

775 # Check for merge conflict (including unmerged files from current merge) 

776 # Unmerged files at this point are genuine conflicts from the current merge 

777 # attempt, not leftover state from previous operations (those are cleaned up 

778 # by _check_and_recover_index() at the start of _process_merge) 

779 if self._is_unmerged_files_error(error_output) or "CONFLICT" in error_output: 

780 self._handle_conflict(request, used_merge_strategy) 

781 return 

782 else: 

783 raise RuntimeError(f"Merge failed: {merge_result.stderr}") 

784 

785 # Merge successful 

786 self._finalize_merge(request) 

787 

788 except Exception as e: 

789 self._consecutive_failures += 1 

790 if self._consecutive_failures >= 3: 

791 self._paused = True 

792 self.logger.error( 

793 f"Circuit breaker tripped after {self._consecutive_failures} consecutive failures. " 

794 "Merge coordinator paused. Manual recovery required." 

795 ) 

796 self._handle_failure(request, str(e)) 

797 

798 finally: 

799 # Always restore stashed changes 

800 if had_local_changes: 

801 self._pop_stash() 

802 # Always restore state file tracking 

803 self._restore_state_file_tracking() 

804 # Clear current issue tracking 

805 with self._lock: 

806 self._current_issue_id = None 

807 

808 def _handle_conflict(self, request: MergeRequest, used_merge_strategy: bool = False) -> None: 

809 """Handle a merge conflict with retry logic. 

810 

811 Args: 

812 request: The merge request that conflicted 

813 used_merge_strategy: If True, merge strategy was used during pull and 

814 rebase retry should be skipped (rebase would fail on same conflicts) 

815 """ 

816 result = request.worker_result 

817 request.retry_count += 1 

818 

819 # Abort the failed merge 

820 self._git_lock.run( 

821 ["merge", "--abort"], 

822 cwd=self.repo_path, 

823 timeout=10, 

824 ) 

825 

826 # Skip rebase retry if merge strategy was used during pull (BUG-079) 

827 # Rebase would fail on the same commits that caused the initial conflict 

828 if used_merge_strategy: 

829 self.logger.warning( 

830 f"Merge conflict for {result.issue_id}, " 

831 f"skipping rebase retry (merge strategy was used during pull)" 

832 ) 

833 self._handle_failure( 

834 request, 

835 "Merge conflict - rebase not attempted (would fail on same conflicts " 

836 "that required merge strategy)", 

837 ) 

838 return 

839 

840 if request.retry_count <= self.config.max_merge_retries: 

841 # Attempt rebase in the worktree 

842 self.logger.warning( 

843 f"Merge conflict for {result.issue_id}, " 

844 f"attempting rebase (retry {request.retry_count}/{self.config.max_merge_retries})" 

845 ) 

846 

847 request.status = MergeStatus.RETRYING 

848 

849 # Check for and stash any unstaged changes in the worktree before rebase 

850 worktree_status = subprocess.run( 

851 ["git", "status", "--porcelain"], 

852 cwd=result.worktree_path, 

853 capture_output=True, 

854 text=True, 

855 timeout=30, 

856 ) 

857 worktree_has_changes = bool(worktree_status.stdout.strip()) 

858 

859 if worktree_has_changes: 

860 self.logger.debug( 

861 f"Stashing worktree changes before rebase: {worktree_status.stdout[:200]}" 

862 ) 

863 stash_result = subprocess.run( 

864 ["git", "stash", "push", "-m", "ll-parallel: auto-stash before rebase"], 

865 cwd=result.worktree_path, 

866 capture_output=True, 

867 text=True, 

868 timeout=30, 

869 ) 

870 if stash_result.returncode != 0: 

871 self.logger.warning(f"Failed to stash worktree changes: {stash_result.stderr}") 

872 

873 # Fetch latest base branch before rebase (BUG-180) 

874 # Use remote/base if fetch succeeds, fall back to base if no remote 

875 base = self.config.base_branch 

876 remote = self.config.remote_name 

877 fetch_result = subprocess.run( 

878 ["git", "fetch", remote, base], 

879 cwd=result.worktree_path, 

880 capture_output=True, 

881 text=True, 

882 timeout=60, 

883 ) 

884 rebase_target = f"{remote}/{base}" if fetch_result.returncode == 0 else base 

885 

886 # Rebase the branch onto latest base branch (BUG-180) 

887 rebase_result = subprocess.run( 

888 ["git", "rebase", rebase_target], 

889 cwd=result.worktree_path, 

890 capture_output=True, 

891 text=True, 

892 timeout=120, 

893 ) 

894 

895 if rebase_result.returncode == 0: 

896 # Rebase succeeded, restore stash if we made one, then retry merge 

897 if worktree_has_changes: 

898 pop_result = subprocess.run( 

899 ["git", "stash", "pop"], 

900 cwd=result.worktree_path, 

901 capture_output=True, 

902 text=True, 

903 timeout=30, 

904 ) 

905 if pop_result.returncode != 0: 

906 self.logger.error( 

907 f"Stash pop failed for {result.issue_id} after rebase: " 

908 f"{pop_result.stderr.strip()}" 

909 ) 

910 show_result = subprocess.run( 

911 ["git", "stash", "show", "-p"], 

912 cwd=result.worktree_path, 

913 capture_output=True, 

914 text=True, 

915 timeout=30, 

916 ) 

917 if show_result.returncode == 0 and show_result.stdout: 

918 self.logger.warning( 

919 f"Dropping unrecoverable worktree stash for {result.issue_id}:\n" 

920 f"{show_result.stdout[:2000]}" 

921 ) 

922 subprocess.run( 

923 ["git", "stash", "drop"], 

924 cwd=result.worktree_path, 

925 capture_output=True, 

926 timeout=10, 

927 ) 

928 self._handle_failure(request, "Stash pop conflict after rebase") 

929 return 

930 self._queue.put(request) 

931 else: 

932 # Rebase also failed - abort and restore stash 

933 subprocess.run( 

934 ["git", "rebase", "--abort"], 

935 cwd=result.worktree_path, 

936 capture_output=True, 

937 timeout=10, 

938 ) 

939 if worktree_has_changes: 

940 subprocess.run( 

941 ["git", "stash", "pop"], 

942 cwd=result.worktree_path, 

943 capture_output=True, 

944 timeout=30, 

945 ) 

946 self._handle_failure( 

947 request, 

948 f"Rebase failed after merge conflict: {rebase_result.stderr}", 

949 ) 

950 else: 

951 self._handle_failure( 

952 request, 

953 f"Merge conflict after {request.retry_count} retries", 

954 ) 

955 

956 def _handle_untracked_conflict(self, request: MergeRequest, error_output: str) -> None: 

957 """Handle untracked files that would be overwritten by merge. 

958 

959 Backs up conflicting untracked files and retries the merge. 

960 

961 Args: 

962 request: The merge request that failed 

963 error_output: Git error message containing file list 

964 """ 

965 result = request.worker_result 

966 request.retry_count += 1 

967 

968 if request.retry_count > self.config.max_merge_retries: 

969 self._handle_failure( 

970 request, 

971 f"Untracked file conflict after {request.retry_count} retries", 

972 ) 

973 return 

974 

975 # Parse conflicting files from error message 

976 # Format: "error: The following untracked working tree files would be overwritten..." 

977 # followed by file paths, then "Please move or remove them..." 

978 conflicting_files = [] 

979 in_file_list = False 

980 for line in error_output.splitlines(): 

981 line = line.strip() 

982 if "untracked working tree files would be overwritten" in line: 

983 in_file_list = True 

984 continue 

985 if "Please move or remove them" in line: 

986 in_file_list = False 

987 continue 

988 if in_file_list and line and not line.startswith("error:"): 

989 conflicting_files.append(line) 

990 

991 if not conflicting_files: 

992 self._handle_failure( 

993 request, 

994 f"Could not parse conflicting files from: {error_output[:200]}", 

995 ) 

996 return 

997 

998 # Create backup directory 

999 backup_dir = self.repo_path / ".ll-backup" / result.issue_id 

1000 backup_dir.mkdir(parents=True, exist_ok=True) 

1001 

1002 # Move conflicting files to backup 

1003 moved_files = [] 

1004 for file_path in conflicting_files: 

1005 src = self.repo_path / file_path 

1006 if src.exists(): 

1007 dst = backup_dir / file_path 

1008 dst.parent.mkdir(parents=True, exist_ok=True) 

1009 shutil.move(str(src), str(dst)) 

1010 moved_files.append(file_path) 

1011 

1012 if moved_files: 

1013 self.logger.info( 

1014 f"Backed up {len(moved_files)} conflicting untracked file(s) to {backup_dir}" 

1015 ) 

1016 

1017 # Retry the merge 

1018 self.logger.warning( 

1019 f"Untracked files conflict for {result.issue_id}, " 

1020 f"retrying after backup (attempt {request.retry_count}/{self.config.max_merge_retries})" 

1021 ) 

1022 request.status = MergeStatus.RETRYING 

1023 self._queue.put(request) 

1024 

1025 def _finalize_merge(self, request: MergeRequest) -> None: 

1026 """Finalize a successful merge. 

1027 

1028 Args: 

1029 request: The completed merge request 

1030 """ 

1031 result = request.worker_result 

1032 request.status = MergeStatus.SUCCESS 

1033 

1034 # Reset circuit breaker on success 

1035 self._consecutive_failures = 0 

1036 

1037 with self._lock: 

1038 self._merged.append(result.issue_id) 

1039 

1040 # Cleanup worktree and branch 

1041 self._cleanup_worktree(result.worktree_path, result.branch_name) 

1042 

1043 self.logger.success(f"Merged {result.issue_id} successfully") 

1044 

1045 def _handle_failure(self, request: MergeRequest, error: str) -> None: 

1046 """Handle a merge failure. 

1047 

1048 Args: 

1049 request: The failed merge request 

1050 error: Error message describing the failure 

1051 """ 

1052 result = request.worker_result 

1053 request.status = MergeStatus.FAILED 

1054 request.error = error 

1055 

1056 with self._lock: 

1057 self._failed[result.issue_id] = error 

1058 

1059 self.logger.error(f"Merge failed for {result.issue_id}: {error}") 

1060 

1061 def _cleanup_worktree(self, worktree_path: Path, branch_name: str) -> None: 

1062 """Clean up a merged worktree and its branch. 

1063 

1064 Args: 

1065 worktree_path: Path to the worktree 

1066 branch_name: Name of the branch to delete 

1067 """ 

1068 if not worktree_path.exists(): 

1069 return 

1070 

1071 # Remove worktree 

1072 self._git_lock.run( 

1073 ["worktree", "unlock", str(worktree_path)], 

1074 cwd=self.repo_path, 

1075 timeout=10, 

1076 ) 

1077 self._git_lock.run( 

1078 ["worktree", "remove", "--force", str(worktree_path)], 

1079 cwd=self.repo_path, 

1080 timeout=30, 

1081 ) 

1082 

1083 # Force delete directory if still exists 

1084 if worktree_path.exists(): 

1085 shutil.rmtree(worktree_path, ignore_errors=True) 

1086 

1087 # Delete the branch 

1088 if branch_name.startswith("parallel/"): 

1089 self._git_lock.run( 

1090 ["branch", "-D", branch_name], 

1091 cwd=self.repo_path, 

1092 timeout=10, 

1093 ) 

1094 

1095 @property 

1096 def merged_ids(self) -> list[str]: 

1097 """List of successfully merged issue IDs.""" 

1098 with self._lock: 

1099 return list(self._merged) 

1100 

1101 @property 

1102 def failed_merges(self) -> dict[str, str]: 

1103 """Mapping of failed issue IDs to error messages.""" 

1104 with self._lock: 

1105 return dict(self._failed) 

1106 

1107 @property 

1108 def pending_count(self) -> int: 

1109 """Number of pending merge requests.""" 

1110 return self._queue.qsize() 

1111 

1112 @property 

1113 def stash_pop_failures(self) -> dict[str, str]: 

1114 """Mapping of issue IDs to stash pop failure messages. 

1115 

1116 These represent issues where the merge succeeded but the user's 

1117 local changes could not be automatically restored and need manual 

1118 recovery via 'git stash pop'. 

1119 """ 

1120 with self._lock: 

1121 return dict(self._stash_pop_failures) 

1122 

1123 def wait_for_completion(self, timeout: float | None = None) -> bool: 

1124 """Wait for all pending merges to complete. 

1125 

1126 Waits until both: 

1127 1. The merge queue is empty (no pending requests) 

1128 2. No merge is actively being processed (_current_issue_id is None) 

1129 

1130 Args: 

1131 timeout: Maximum time to wait (None = forever) 

1132 

1133 Returns: 

1134 True if all merges completed, False if timeout 

1135 """ 

1136 start_time = time.time() 

1137 while True: 

1138 with self._lock: 

1139 active = self._current_issue_id 

1140 if self._queue.empty() and not active: 

1141 return True 

1142 if timeout and (time.time() - start_time) > timeout: 

1143 return False 

1144 time.sleep(0.5)