Coverage for little_loops / parallel / merge_coordinator.py: 84%
424 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Merge coordinator for sequential integration of parallel worker changes.
3Handles merging completed worker branches back to main with conflict detection
4and automatic retry capability.
5"""
7from __future__ import annotations
9import shutil
10import subprocess
11import threading
12import time
13from pathlib import Path
14from queue import Empty, Queue
15from typing import TYPE_CHECKING
17from little_loops.parallel.git_lock import GitLock
18from little_loops.parallel.types import (
19 MergeRequest,
20 MergeStatus,
21 ParallelConfig,
22 WorkerResult,
23)
25if TYPE_CHECKING:
26 from little_loops.logger import Logger
29class MergeCoordinator:
30 """Sequential merge queue with conflict handling.
32 Processes merge requests one at a time to avoid conflicts. Supports
33 automatic rebase and retry on merge failures. Handles uncommitted local
34 changes by stashing them before merge operations.
36 Example:
37 >>> coordinator = MergeCoordinator(config, logger, repo_path)
38 >>> coordinator.start()
39 >>> coordinator.queue_merge(worker_result)
40 >>> # ... later ...
41 >>> coordinator.shutdown()
42 """
44 def __init__(
45 self,
46 config: ParallelConfig,
47 logger: Logger,
48 repo_path: Path | None = None,
49 git_lock: GitLock | None = None,
50 ) -> None:
51 """Initialize the merge coordinator.
53 Args:
54 config: Parallel processing configuration
55 logger: Logger for merge output
56 repo_path: Path to the git repository (default: current directory)
57 git_lock: Shared lock for git operations (created if not provided)
58 """
59 self.config = config
60 self.logger = logger
61 self.repo_path = repo_path or Path.cwd()
62 self._git_lock = git_lock or GitLock(logger)
63 self._queue: Queue[MergeRequest] = Queue()
64 self._thread: threading.Thread | None = None
65 self._shutdown_event = threading.Event()
66 self._merged: list[str] = []
67 self._failed: dict[str, str] = {}
68 self._lock = threading.Lock()
69 self._stash_active = False # Track if we have an active stash
70 self._consecutive_failures = 0 # Circuit breaker counter
71 self._paused = False # Set when circuit breaker trips
72 self._assume_unchanged_active = False # Track if state file is marked assume-unchanged
73 self._stash_pop_failures: dict[str, str] = {} # issue_id -> failure message
74 self._current_issue_id: str | None = (
75 None # Track current issue for stash failure attribution
76 )
77 self._problematic_commits: set[str] = (
78 set()
79 ) # Track commits causing repeated rebase conflicts
81 def start(self) -> None:
82 """Start the merge coordinator background thread."""
83 if self._thread is not None and self._thread.is_alive():
84 return
86 self._shutdown_event.clear()
87 self._thread = threading.Thread(
88 target=self._merge_loop,
89 name="merge-coordinator",
90 daemon=True,
91 )
92 self._thread.start()
93 self.logger.info("Merge coordinator started")
95 def shutdown(self, wait: bool = True, timeout: float = 30.0) -> None:
96 """Shutdown the merge coordinator.
98 Args:
99 wait: Whether to wait for pending merges to complete
100 timeout: Maximum time to wait for shutdown
101 """
102 if self._thread is None:
103 return
105 self._shutdown_event.set()
107 if wait and self._thread.is_alive():
108 self._thread.join(timeout=timeout)
110 self._thread = None
111 self.logger.info("Merge coordinator stopped")
113 def queue_merge(self, worker_result: WorkerResult) -> None:
114 """Queue a worker result for merging.
116 Args:
117 worker_result: Result from a completed worker
118 """
119 request = MergeRequest(worker_result=worker_result)
120 self._queue.put(request)
121 self.logger.info(
122 f"Queued merge for {worker_result.issue_id} (branch: {worker_result.branch_name})"
123 )
125 def _stash_local_changes(self) -> bool:
126 """Stash any uncommitted tracked changes in the main repo.
128 Only stashes tracked file modifications. Untracked files are not stashed
129 because git stash pathspec exclusions don't work reliably with -u flag.
130 Untracked file conflicts during merge are handled by _handle_untracked_conflict.
132 The following are explicitly excluded from stashing:
133 1. State file - managed by orchestrator and continuously updated
134 2. Lifecycle file moves - issue files being moved to completed/ directory
135 3. Files in completed directory - lifecycle-managed files
136 4. Claude Code context state file - managed by Claude Code externally
138 These exclusions prevent stash pop conflicts after merge, since the merge
139 may change HEAD and create conflicts with stashed rename operations.
141 Returns:
142 True if changes were stashed, False if working tree was clean
143 """
144 state_file_path = Path(self.config.state_file)
145 state_file_str = str(state_file_path)
146 state_file_name = state_file_path.name
148 # Check if there are any tracked changes to stash.
149 # We only look at tracked files (exclude untracked with grep -v '??')
150 # since we can only reliably stash tracked changes.
151 status_result = self._git_lock.run(
152 ["status", "--porcelain"],
153 cwd=self.repo_path,
154 timeout=30,
155 )
157 # Filter to only tracked changes (lines not starting with ??)
158 # and exclude orchestrator-managed files to prevent stash pop conflicts
159 tracked_changes = []
160 files_to_stash = []
161 # Note: Don't use .strip() on the full output - it removes leading spaces
162 # from the first line which are significant in git status porcelain format
163 for line in status_result.stdout.splitlines():
164 if not line or line.startswith("??"):
165 continue
166 # Extract file path from porcelain format (XY filename or XY -> filename for renames)
167 # Format: XY filename or XY old -> new (XY is exactly 2 chars + 1 space)
168 file_path = line[3:].split(" -> ")[-1].strip()
169 if file_path == state_file_str or file_path.endswith(state_file_name):
170 continue # Skip state file - orchestrator manages it independently
171 # Skip Claude Code context state file - managed externally
172 if file_path.endswith("ll-context-state.json"):
173 self.logger.debug(f"Skipping Claude context state file from stash: {file_path}")
174 continue
175 tracked_changes.append(line)
176 files_to_stash.append(file_path)
178 if not files_to_stash:
179 return False # No tracked changes to stash (excluding state file)
181 # Log files to be stashed for debugging
182 self.logger.debug(f"Tracked files to stash: {tracked_changes[:10]}")
184 # Stash only specific files, explicitly excluding the state file.
185 # Using explicit file list avoids race conditions where the orchestrator
186 # modifies the state file between a checkout and stash-all operation.
187 # Note: gitignored files are never stashed anyway.
188 stash_result = self._git_lock.run(
189 [
190 "stash",
191 "push",
192 "-m",
193 "ll-parallel: auto-stash before merge",
194 "--",
195 *files_to_stash,
196 ],
197 cwd=self.repo_path,
198 timeout=30,
199 )
201 if stash_result.returncode == 0:
202 self._stash_active = True
203 self.logger.info("Stashed local changes before merge")
204 return True
206 self.logger.error(f"Failed to stash local changes: {stash_result.stderr}")
207 return False
209 def _pop_stash(self) -> bool:
210 """Restore stashed changes if any were stashed.
212 Important: This method preserves the merge even if stash pop fails.
213 We never reset --hard HEAD here because that would undo a successful merge.
215 Returns:
216 True if stash was successfully popped or no stash was active,
217 False if pop failed (stash is left for manual recovery).
218 """
219 if not self._stash_active:
220 return True
222 pop_result = self._git_lock.run(
223 ["stash", "pop"],
224 cwd=self.repo_path,
225 timeout=30,
226 )
228 if pop_result.returncode != 0:
229 self.logger.warning(f"Failed to pop stash: {pop_result.stderr.strip()}")
231 # Check if it's a conflict issue - in that case, stash pop may have
232 # partially applied. We need to clean up the index but preserve the merge.
233 status_result = self._git_lock.run(
234 ["status", "--porcelain"],
235 cwd=self.repo_path,
236 timeout=30,
237 )
239 # Check for unmerged entries from the stash pop attempt
240 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD")
241 has_unmerged = any(
242 line[:2] in unmerged_prefixes
243 for line in status_result.stdout.splitlines()
244 if len(line) >= 2
245 )
247 if has_unmerged:
248 # Clean up the conflicted stash pop without affecting the merge
249 # Use checkout to restore conflicted files to their post-merge state
250 self._git_lock.run(
251 ["checkout", "--theirs", "."],
252 cwd=self.repo_path,
253 timeout=30,
254 )
255 self._git_lock.run(
256 ["reset", "HEAD"],
257 cwd=self.repo_path,
258 timeout=30,
259 )
260 self.logger.info("Cleaned up conflicted stash pop, merge preserved")
262 # Leave the stash intact for manual recovery
263 self._stash_active = False
265 # Record this failure for reporting in final summary
266 if self._current_issue_id:
267 with self._lock:
268 self._stash_pop_failures[self._current_issue_id] = (
269 "Local changes could not be restored after merge. "
270 "Run 'git stash list' and 'git stash pop' to recover manually."
271 )
273 self.logger.warning(
274 "Stash could not be restored - your changes are saved in 'git stash list'. "
275 "Run 'git stash show' to view and 'git stash pop' to retry manually."
276 )
277 return False
279 self._stash_active = False
280 self.logger.info("Restored stashed local changes")
281 return True
283 def _mark_state_file_assume_unchanged(self) -> bool:
284 """Mark the state file as assume-unchanged to prevent git from seeing modifications.
286 This allows git pull --rebase to proceed even when the state file is modified,
287 since the orchestrator continuously updates it during processing.
289 Returns:
290 True if successfully marked, False otherwise
291 """
292 state_file = str(self.config.state_file)
294 # Check if file exists and is tracked
295 ls_files = self._git_lock.run(
296 ["ls-files", state_file],
297 cwd=self.repo_path,
298 timeout=10,
299 )
301 if not ls_files.stdout.strip():
302 # File not tracked, nothing to do
303 return True
305 result = self._git_lock.run(
306 ["update-index", "--assume-unchanged", state_file],
307 cwd=self.repo_path,
308 timeout=10,
309 )
311 if result.returncode == 0:
312 self._assume_unchanged_active = True
313 self.logger.debug(f"Marked {state_file} as assume-unchanged")
314 return True
316 self.logger.warning(f"Failed to mark state file assume-unchanged: {result.stderr}")
317 return False
319 def _restore_state_file_tracking(self) -> bool:
320 """Restore normal tracking for the state file.
322 Returns:
323 True if successfully restored, False otherwise
324 """
325 if not self._assume_unchanged_active:
326 return True
328 state_file = str(self.config.state_file)
330 result = self._git_lock.run(
331 ["update-index", "--no-assume-unchanged", state_file],
332 cwd=self.repo_path,
333 timeout=10,
334 )
336 self._assume_unchanged_active = False
338 if result.returncode != 0:
339 self.logger.warning(f"Failed to restore state file tracking: {result.stderr}")
340 return False
342 self.logger.debug(f"Restored tracking for {state_file}")
343 return True
345 def _is_local_changes_error(self, error_output: str) -> bool:
346 """Check if the error is due to uncommitted local changes.
348 Args:
349 error_output: The stderr/stdout from the failed git command
351 Returns:
352 True if the error indicates local changes would be overwritten
353 """
354 indicators = [
355 "Your local changes to the following files would be overwritten",
356 "Please commit your changes or stash them before you merge",
357 "error: cannot pull with rebase: You have unstaged changes",
358 ]
359 return any(indicator in error_output for indicator in indicators)
361 def _is_untracked_files_error(self, error_output: str) -> bool:
362 """Check if the error is due to untracked files blocking merge.
364 Args:
365 error_output: The stderr/stdout from the failed git command
367 Returns:
368 True if the error indicates untracked files would be overwritten
369 """
370 indicators = [
371 "untracked working tree files would be overwritten by merge",
372 "Please move or remove them before you merge",
373 ]
374 return any(indicator in error_output for indicator in indicators)
376 def _is_index_error(self, error_output: str) -> bool:
377 """Check if the error is due to a corrupted git index.
379 Args:
380 error_output: The stderr/stdout from the failed git command
382 Returns:
383 True if the error indicates index problems
384 """
385 indicators = [
386 "you need to resolve your current index first",
387 "fatal: cannot do a partial commit during a merge",
388 "error: you have not concluded your merge",
389 ]
390 return any(indicator in error_output for indicator in indicators)
392 def _is_rebase_in_progress(self) -> bool:
393 """Check if a rebase is currently in progress.
395 Returns:
396 True if rebase is in progress (rebase-merge or rebase-apply exists)
397 """
398 rebase_merge = self.repo_path / ".git" / "rebase-merge"
399 rebase_apply = self.repo_path / ".git" / "rebase-apply"
400 return rebase_merge.exists() or rebase_apply.exists()
402 def _abort_rebase_if_in_progress(self) -> bool:
403 """Abort any in-progress rebase operation.
405 Returns:
406 True if rebase was aborted or none was in progress,
407 False if abort failed
408 """
409 if not self._is_rebase_in_progress():
410 return True
412 self.logger.warning("Detected rebase in progress, aborting...")
413 abort_result = self._git_lock.run(
414 ["rebase", "--abort"],
415 cwd=self.repo_path,
416 timeout=30,
417 )
419 if abort_result.returncode != 0:
420 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}")
421 # Force hard reset as last resort
422 return self._attempt_hard_reset()
424 self.logger.info("Aborted incomplete rebase from pull")
425 return True
427 def _is_unmerged_files_error(self, error_output: str) -> bool:
428 """Check if the error is due to pre-existing unmerged files.
430 Args:
431 error_output: The stderr/stdout from the failed git command
433 Returns:
434 True if the error indicates unmerged files blocking the operation
435 """
436 indicators = [
437 "you have unmerged files",
438 "Merging is not possible because you have unmerged files",
439 "fix conflicts and then commit the result",
440 ]
441 return any(indicator in error_output for indicator in indicators)
443 def _detect_conflict_commit(self, error_output: str) -> str | None:
444 """Extract commit hash from rebase conflict output.
446 Looks for patterns like:
447 - "dropping ae3b85ec1cac501058f6e5da362be37be1c99801 feat(ai): add stall detectio"
449 Args:
450 error_output: The stderr/stdout from the failed git pull --rebase
452 Returns:
453 The 40-character commit hash if found, None otherwise
454 """
455 import re
457 # Pattern: "dropping <40-char-hash>" followed by space and message
458 # Match only full 40-char hashes to avoid false positives
459 match = re.search(r"dropping\s+([a-f0-9]{40})\s+", error_output, re.IGNORECASE)
460 return match.group(1) if match else None
462 def _check_and_recover_index(self) -> bool:
463 """Check git index health and attempt recovery if needed.
465 Returns:
466 True if index is healthy or was recovered, False if unrecoverable
467 """
468 # Check if we're in the middle of a merge
469 merge_head = self.repo_path / ".git" / "MERGE_HEAD"
470 if merge_head.exists():
471 self.logger.warning("Detected incomplete merge, aborting...")
472 abort_result = self._git_lock.run(
473 ["merge", "--abort"],
474 cwd=self.repo_path,
475 timeout=30,
476 )
477 if abort_result.returncode != 0:
478 self.logger.error(f"Failed to abort merge: {abort_result.stderr}")
479 return False
480 self.logger.info("Aborted incomplete merge")
482 # Check if we're in the middle of a rebase
483 rebase_dir = self.repo_path / ".git" / "rebase-merge"
484 rebase_apply = self.repo_path / ".git" / "rebase-apply"
485 if rebase_dir.exists() or rebase_apply.exists():
486 self.logger.warning("Detected incomplete rebase, aborting...")
487 abort_result = self._git_lock.run(
488 ["rebase", "--abort"],
489 cwd=self.repo_path,
490 timeout=30,
491 )
492 if abort_result.returncode != 0:
493 self.logger.error(f"Failed to abort rebase: {abort_result.stderr}")
494 return False
495 self.logger.info("Aborted incomplete rebase")
496 # Force reset after rebase abort - the abort can leave index in dirty state
497 # This is defensive since unmerged file detection below may not trigger
498 if not self._attempt_hard_reset():
499 return False
501 # Check for unmerged files in the index (UU, AA, DD, AU, UA, DU, UD prefixes)
502 # These can persist even after merge --abort in some edge cases
503 status_result = self._git_lock.run(
504 ["status", "--porcelain"],
505 cwd=self.repo_path,
506 timeout=30,
507 )
509 if status_result.returncode != 0:
510 self.logger.error(f"git status failed: {status_result.stderr}")
511 return self._attempt_hard_reset()
513 # Debug logging to diagnose unmerged detection issues
514 if status_result.stdout.strip():
515 self.logger.debug(f"Git status output: {status_result.stdout[:500]}")
517 # Check for unmerged entries (first two chars indicate index/worktree status)
518 # Unmerged states: UU (both modified), AA (both added), DD (both deleted),
519 # AU/UA (added by us/them), DU/UD (deleted by us/them)
520 unmerged_prefixes = ("UU", "AA", "DD", "AU", "UA", "DU", "UD")
521 has_unmerged = any(
522 line[:2] in unmerged_prefixes
523 for line in status_result.stdout.splitlines()
524 if len(line) >= 2
525 )
527 if has_unmerged:
528 self.logger.warning("Detected unmerged files in index, resetting...")
529 if not self._attempt_hard_reset():
530 return False
531 self.logger.info("Cleared unmerged files from index")
533 # Final safety check - if MERGE_HEAD still exists, force reset
534 # This catches edge cases where abort succeeded but state is still dirty
535 if merge_head.exists():
536 self.logger.warning("MERGE_HEAD persists after recovery attempts, forcing reset")
537 if not self._attempt_hard_reset():
538 return False
540 return True
542 def _attempt_hard_reset(self) -> bool:
543 """Attempt a hard reset to recover from index issues.
545 Returns:
546 True if reset succeeded, False otherwise
547 """
548 self.logger.warning("Attempting hard reset to recover...")
549 reset_result = self._git_lock.run(
550 ["reset", "--hard", "HEAD"],
551 cwd=self.repo_path,
552 timeout=30,
553 )
554 if reset_result.returncode != 0:
555 self.logger.error("Hard reset failed - manual intervention required")
556 return False
557 self.logger.info("Hard reset successful")
558 return True
560 def _merge_loop(self) -> None:
561 """Background thread loop for processing merge requests."""
562 while not self._shutdown_event.is_set():
563 try:
564 # Wait for next merge request with timeout
565 try:
566 request = self._queue.get(timeout=1.0)
567 except Empty:
568 continue
570 # Process the merge
571 self._process_merge(request)
573 except Exception as e:
574 self.logger.error(f"Merge loop error: {e}")
575 time.sleep(1.0)
577 def _process_merge(self, request: MergeRequest) -> None:
578 """Process a single merge request.
580 Stashes any uncommitted local changes before attempting the merge,
581 and restores them afterward (regardless of success/failure).
583 Args:
584 request: Merge request to process
585 """
586 result = request.worker_result
587 with self._lock:
588 self._current_issue_id = result.issue_id
589 self.logger.info(f"Processing merge for {result.issue_id}")
590 had_local_changes = False
592 try:
593 # Circuit breaker check
594 if self._paused:
595 self.logger.warning(
596 f"Merge coordinator paused due to repeated failures. "
597 f"Skipping {result.issue_id}. Manual intervention required."
598 )
599 self._handle_failure(request, "Merge coordinator paused - circuit breaker tripped")
600 return
602 request.status = MergeStatus.IN_PROGRESS
604 # Health check: ensure git index is clean before proceeding
605 if not self._check_and_recover_index():
606 self._consecutive_failures += 1
607 if self._consecutive_failures >= 3:
608 self._paused = True
609 self.logger.error(
610 f"Circuit breaker tripped after {self._consecutive_failures} consecutive failures. "
611 "Merge coordinator paused. Manual recovery required."
612 )
613 self._handle_failure(request, "Git index recovery failed")
614 return
616 # Mark state file as assume-unchanged to prevent pull --rebase conflicts
617 # The orchestrator continuously updates the state file during processing
618 self._mark_state_file_assume_unchanged()
620 # Stash any local changes before merge operations
621 had_local_changes = self._stash_local_changes()
623 # Ensure we're on base branch in the main repo
624 base = self.config.base_branch
625 remote = self.config.remote_name
626 checkout_result = self._git_lock.run(
627 ["checkout", base],
628 cwd=self.repo_path,
629 timeout=30,
630 )
632 if checkout_result.returncode != 0:
633 error_output = checkout_result.stderr + checkout_result.stdout
634 if self._is_local_changes_error(error_output):
635 # This shouldn't happen since we stashed, but handle it anyway
636 self.logger.warning(
637 "Checkout failed due to local changes despite stash attempt"
638 )
639 if self._is_index_error(error_output):
640 # Try recovery
641 if self._check_and_recover_index():
642 # Retry checkout
643 checkout_result = self._git_lock.run(
644 ["checkout", base],
645 cwd=self.repo_path,
646 timeout=30,
647 )
648 if checkout_result.returncode == 0:
649 self.logger.info("Recovered from index error, checkout succeeded")
650 else:
651 raise RuntimeError(
652 f"Failed to checkout {base} after recovery: "
653 f"{checkout_result.stderr}"
654 )
655 else:
656 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}")
657 else:
658 raise RuntimeError(f"Failed to checkout {base}: {checkout_result.stderr}")
660 # Track if merge strategy was used during pull (for conflict handling)
661 used_merge_strategy = False
663 # Pull latest changes
664 pull_result = self._git_lock.run(
665 ["pull", "--rebase", remote, base],
666 cwd=self.repo_path,
667 timeout=60,
668 )
670 # Handle pull failures
671 if pull_result.returncode != 0:
672 error_output = pull_result.stderr + pull_result.stdout
674 # Check if rebase conflicted - must abort before continuing
675 if self._is_rebase_in_progress():
676 conflict_commit = self._detect_conflict_commit(error_output)
678 if conflict_commit and conflict_commit in self._problematic_commits:
679 # Known problematic commit - use merge strategy instead
680 self.logger.info(
681 f"Repeated rebase conflict with {conflict_commit[:8]}, "
682 f"using merge strategy (git pull --no-rebase)"
683 )
684 if not self._abort_rebase_if_in_progress():
685 raise RuntimeError("Failed to recover from rebase conflict during pull")
687 # Attempt merge strategy pull
688 merge_pull_result = self._git_lock.run(
689 ["pull", "--no-rebase", remote, base],
690 cwd=self.repo_path,
691 timeout=60,
692 )
694 if merge_pull_result.returncode != 0:
695 self.logger.warning(
696 f"Merge strategy pull also failed: {merge_pull_result.stderr[:200]}"
697 )
698 # Continue anyway - merge may still work or fail appropriately
699 else:
700 self.logger.info(
701 f"Merge strategy pull succeeded for {conflict_commit[:8]}"
702 )
703 used_merge_strategy = True
705 else:
706 # First time seeing this conflict or couldn't extract commit
707 if conflict_commit:
708 self._problematic_commits.add(conflict_commit)
709 self.logger.warning(
710 f"New rebase conflict with {conflict_commit[:8]}, "
711 f"tracking for future merges (will use merge strategy on repeat)"
712 )
713 else:
714 self.logger.warning(
715 "Rebase conflict detected (could not extract commit hash), "
716 "tracking for future merges"
717 )
719 self.logger.warning(
720 f"Pull --rebase failed with conflicts: {error_output[:200]}"
721 )
723 if not self._abort_rebase_if_in_progress():
724 raise RuntimeError("Failed to recover from rebase conflict during pull")
725 # After aborting rebase, we're back to pre-pull state
726 # Continue without the pull - merge may still work or conflict
727 self.logger.info("Continuing without pull after rebase abort")
729 elif self._is_local_changes_error(error_output):
730 self.logger.warning(
731 f"Pull failed due to local changes, attempting re-stash: {error_output[:200]}"
732 )
733 # Re-stash any local changes that appeared during pull
734 if self._stash_local_changes():
735 self.logger.info("Re-stashed local changes after pull conflict")
736 had_local_changes = True
737 # For other pull failures, continue - merge will handle or fail
739 # Safety check: ensure no unmerged files before merge attempt
740 # This catches edge cases where previous operations left dirty state
741 if not self._check_and_recover_index():
742 raise RuntimeError(
743 "Git index has unresolved conflicts before merge - recovery failed"
744 )
746 # Attempt merge with no-ff
747 merge_result = self._git_lock.run(
748 [
749 "merge",
750 result.branch_name,
751 "--no-ff",
752 "-m",
753 f"feat: parallel merge {result.issue_id}\n\n"
754 f"Automated merge from parallel issue processing.",
755 ],
756 cwd=self.repo_path,
757 timeout=60,
758 )
760 if merge_result.returncode != 0:
761 error_output = merge_result.stderr + merge_result.stdout
763 # Check for local changes error (shouldn't happen after stash)
764 if self._is_local_changes_error(error_output):
765 self.logger.warning(
766 f"Merge blocked by local changes despite stash: {error_output[:200]}"
767 )
768 raise RuntimeError(f"Merge failed due to local changes: {error_output[:200]}")
770 # Check for untracked files blocking merge
771 if self._is_untracked_files_error(error_output):
772 self._handle_untracked_conflict(request, error_output)
773 return
775 # Check for merge conflict (including unmerged files from current merge)
776 # Unmerged files at this point are genuine conflicts from the current merge
777 # attempt, not leftover state from previous operations (those are cleaned up
778 # by _check_and_recover_index() at the start of _process_merge)
779 if self._is_unmerged_files_error(error_output) or "CONFLICT" in error_output:
780 self._handle_conflict(request, used_merge_strategy)
781 return
782 else:
783 raise RuntimeError(f"Merge failed: {merge_result.stderr}")
785 # Merge successful
786 self._finalize_merge(request)
788 except Exception as e:
789 self._consecutive_failures += 1
790 if self._consecutive_failures >= 3:
791 self._paused = True
792 self.logger.error(
793 f"Circuit breaker tripped after {self._consecutive_failures} consecutive failures. "
794 "Merge coordinator paused. Manual recovery required."
795 )
796 self._handle_failure(request, str(e))
798 finally:
799 # Always restore stashed changes
800 if had_local_changes:
801 self._pop_stash()
802 # Always restore state file tracking
803 self._restore_state_file_tracking()
804 # Clear current issue tracking
805 with self._lock:
806 self._current_issue_id = None
808 def _handle_conflict(self, request: MergeRequest, used_merge_strategy: bool = False) -> None:
809 """Handle a merge conflict with retry logic.
811 Args:
812 request: The merge request that conflicted
813 used_merge_strategy: If True, merge strategy was used during pull and
814 rebase retry should be skipped (rebase would fail on same conflicts)
815 """
816 result = request.worker_result
817 request.retry_count += 1
819 # Abort the failed merge
820 self._git_lock.run(
821 ["merge", "--abort"],
822 cwd=self.repo_path,
823 timeout=10,
824 )
826 # Skip rebase retry if merge strategy was used during pull (BUG-079)
827 # Rebase would fail on the same commits that caused the initial conflict
828 if used_merge_strategy:
829 self.logger.warning(
830 f"Merge conflict for {result.issue_id}, "
831 f"skipping rebase retry (merge strategy was used during pull)"
832 )
833 self._handle_failure(
834 request,
835 "Merge conflict - rebase not attempted (would fail on same conflicts "
836 "that required merge strategy)",
837 )
838 return
840 if request.retry_count <= self.config.max_merge_retries:
841 # Attempt rebase in the worktree
842 self.logger.warning(
843 f"Merge conflict for {result.issue_id}, "
844 f"attempting rebase (retry {request.retry_count}/{self.config.max_merge_retries})"
845 )
847 request.status = MergeStatus.RETRYING
849 # Check for and stash any unstaged changes in the worktree before rebase
850 worktree_status = subprocess.run(
851 ["git", "status", "--porcelain"],
852 cwd=result.worktree_path,
853 capture_output=True,
854 text=True,
855 timeout=30,
856 )
857 worktree_has_changes = bool(worktree_status.stdout.strip())
859 if worktree_has_changes:
860 self.logger.debug(
861 f"Stashing worktree changes before rebase: {worktree_status.stdout[:200]}"
862 )
863 stash_result = subprocess.run(
864 ["git", "stash", "push", "-m", "ll-parallel: auto-stash before rebase"],
865 cwd=result.worktree_path,
866 capture_output=True,
867 text=True,
868 timeout=30,
869 )
870 if stash_result.returncode != 0:
871 self.logger.warning(f"Failed to stash worktree changes: {stash_result.stderr}")
873 # Fetch latest base branch before rebase (BUG-180)
874 # Use remote/base if fetch succeeds, fall back to base if no remote
875 base = self.config.base_branch
876 remote = self.config.remote_name
877 fetch_result = subprocess.run(
878 ["git", "fetch", remote, base],
879 cwd=result.worktree_path,
880 capture_output=True,
881 text=True,
882 timeout=60,
883 )
884 rebase_target = f"{remote}/{base}" if fetch_result.returncode == 0 else base
886 # Rebase the branch onto latest base branch (BUG-180)
887 rebase_result = subprocess.run(
888 ["git", "rebase", rebase_target],
889 cwd=result.worktree_path,
890 capture_output=True,
891 text=True,
892 timeout=120,
893 )
895 if rebase_result.returncode == 0:
896 # Rebase succeeded, restore stash if we made one, then retry merge
897 if worktree_has_changes:
898 pop_result = subprocess.run(
899 ["git", "stash", "pop"],
900 cwd=result.worktree_path,
901 capture_output=True,
902 text=True,
903 timeout=30,
904 )
905 if pop_result.returncode != 0:
906 self.logger.error(
907 f"Stash pop failed for {result.issue_id} after rebase: "
908 f"{pop_result.stderr.strip()}"
909 )
910 show_result = subprocess.run(
911 ["git", "stash", "show", "-p"],
912 cwd=result.worktree_path,
913 capture_output=True,
914 text=True,
915 timeout=30,
916 )
917 if show_result.returncode == 0 and show_result.stdout:
918 self.logger.warning(
919 f"Dropping unrecoverable worktree stash for {result.issue_id}:\n"
920 f"{show_result.stdout[:2000]}"
921 )
922 subprocess.run(
923 ["git", "stash", "drop"],
924 cwd=result.worktree_path,
925 capture_output=True,
926 timeout=10,
927 )
928 self._handle_failure(request, "Stash pop conflict after rebase")
929 return
930 self._queue.put(request)
931 else:
932 # Rebase also failed - abort and restore stash
933 subprocess.run(
934 ["git", "rebase", "--abort"],
935 cwd=result.worktree_path,
936 capture_output=True,
937 timeout=10,
938 )
939 if worktree_has_changes:
940 subprocess.run(
941 ["git", "stash", "pop"],
942 cwd=result.worktree_path,
943 capture_output=True,
944 timeout=30,
945 )
946 self._handle_failure(
947 request,
948 f"Rebase failed after merge conflict: {rebase_result.stderr}",
949 )
950 else:
951 self._handle_failure(
952 request,
953 f"Merge conflict after {request.retry_count} retries",
954 )
956 def _handle_untracked_conflict(self, request: MergeRequest, error_output: str) -> None:
957 """Handle untracked files that would be overwritten by merge.
959 Backs up conflicting untracked files and retries the merge.
961 Args:
962 request: The merge request that failed
963 error_output: Git error message containing file list
964 """
965 result = request.worker_result
966 request.retry_count += 1
968 if request.retry_count > self.config.max_merge_retries:
969 self._handle_failure(
970 request,
971 f"Untracked file conflict after {request.retry_count} retries",
972 )
973 return
975 # Parse conflicting files from error message
976 # Format: "error: The following untracked working tree files would be overwritten..."
977 # followed by file paths, then "Please move or remove them..."
978 conflicting_files = []
979 in_file_list = False
980 for line in error_output.splitlines():
981 line = line.strip()
982 if "untracked working tree files would be overwritten" in line:
983 in_file_list = True
984 continue
985 if "Please move or remove them" in line:
986 in_file_list = False
987 continue
988 if in_file_list and line and not line.startswith("error:"):
989 conflicting_files.append(line)
991 if not conflicting_files:
992 self._handle_failure(
993 request,
994 f"Could not parse conflicting files from: {error_output[:200]}",
995 )
996 return
998 # Create backup directory
999 backup_dir = self.repo_path / ".ll-backup" / result.issue_id
1000 backup_dir.mkdir(parents=True, exist_ok=True)
1002 # Move conflicting files to backup
1003 moved_files = []
1004 for file_path in conflicting_files:
1005 src = self.repo_path / file_path
1006 if src.exists():
1007 dst = backup_dir / file_path
1008 dst.parent.mkdir(parents=True, exist_ok=True)
1009 shutil.move(str(src), str(dst))
1010 moved_files.append(file_path)
1012 if moved_files:
1013 self.logger.info(
1014 f"Backed up {len(moved_files)} conflicting untracked file(s) to {backup_dir}"
1015 )
1017 # Retry the merge
1018 self.logger.warning(
1019 f"Untracked files conflict for {result.issue_id}, "
1020 f"retrying after backup (attempt {request.retry_count}/{self.config.max_merge_retries})"
1021 )
1022 request.status = MergeStatus.RETRYING
1023 self._queue.put(request)
1025 def _finalize_merge(self, request: MergeRequest) -> None:
1026 """Finalize a successful merge.
1028 Args:
1029 request: The completed merge request
1030 """
1031 result = request.worker_result
1032 request.status = MergeStatus.SUCCESS
1034 # Reset circuit breaker on success
1035 self._consecutive_failures = 0
1037 with self._lock:
1038 self._merged.append(result.issue_id)
1040 # Cleanup worktree and branch
1041 self._cleanup_worktree(result.worktree_path, result.branch_name)
1043 self.logger.success(f"Merged {result.issue_id} successfully")
1045 def _handle_failure(self, request: MergeRequest, error: str) -> None:
1046 """Handle a merge failure.
1048 Args:
1049 request: The failed merge request
1050 error: Error message describing the failure
1051 """
1052 result = request.worker_result
1053 request.status = MergeStatus.FAILED
1054 request.error = error
1056 with self._lock:
1057 self._failed[result.issue_id] = error
1059 self.logger.error(f"Merge failed for {result.issue_id}: {error}")
1061 def _cleanup_worktree(self, worktree_path: Path, branch_name: str) -> None:
1062 """Clean up a merged worktree and its branch.
1064 Args:
1065 worktree_path: Path to the worktree
1066 branch_name: Name of the branch to delete
1067 """
1068 if not worktree_path.exists():
1069 return
1071 # Remove worktree
1072 self._git_lock.run(
1073 ["worktree", "unlock", str(worktree_path)],
1074 cwd=self.repo_path,
1075 timeout=10,
1076 )
1077 self._git_lock.run(
1078 ["worktree", "remove", "--force", str(worktree_path)],
1079 cwd=self.repo_path,
1080 timeout=30,
1081 )
1083 # Force delete directory if still exists
1084 if worktree_path.exists():
1085 shutil.rmtree(worktree_path, ignore_errors=True)
1087 # Delete the branch
1088 if branch_name.startswith("parallel/"):
1089 self._git_lock.run(
1090 ["branch", "-D", branch_name],
1091 cwd=self.repo_path,
1092 timeout=10,
1093 )
1095 @property
1096 def merged_ids(self) -> list[str]:
1097 """List of successfully merged issue IDs."""
1098 with self._lock:
1099 return list(self._merged)
1101 @property
1102 def failed_merges(self) -> dict[str, str]:
1103 """Mapping of failed issue IDs to error messages."""
1104 with self._lock:
1105 return dict(self._failed)
1107 @property
1108 def pending_count(self) -> int:
1109 """Number of pending merge requests."""
1110 return self._queue.qsize()
1112 @property
1113 def stash_pop_failures(self) -> dict[str, str]:
1114 """Mapping of issue IDs to stash pop failure messages.
1116 These represent issues where the merge succeeded but the user's
1117 local changes could not be automatically restored and need manual
1118 recovery via 'git stash pop'.
1119 """
1120 with self._lock:
1121 return dict(self._stash_pop_failures)
1123 def wait_for_completion(self, timeout: float | None = None) -> bool:
1124 """Wait for all pending merges to complete.
1126 Waits until both:
1127 1. The merge queue is empty (no pending requests)
1128 2. No merge is actively being processed (_current_issue_id is None)
1130 Args:
1131 timeout: Maximum time to wait (None = forever)
1133 Returns:
1134 True if all merges completed, False if timeout
1135 """
1136 start_time = time.time()
1137 while True:
1138 with self._lock:
1139 active = self._current_issue_id
1140 if self._queue.empty() and not active:
1141 return True
1142 if timeout and (time.time() - start_time) > timeout:
1143 return False
1144 time.sleep(0.5)