Coverage for little_loops / parallel / orchestrator.py: 84%
643 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Main orchestrator for parallel issue processing.
3Coordinates the priority queue, worker pool, and merge coordinator to process
4multiple issues concurrently.
5"""
7from __future__ import annotations
9import json
10import os
11import re
12import shutil
13import signal
14import subprocess
15import tempfile
16import threading
17import time
18from datetime import UTC, datetime
19from pathlib import Path
20from typing import TYPE_CHECKING, Any
22from little_loops.events import EventBus
23from little_loops.frontmatter import update_frontmatter
24from little_loops.issue_parser import IssueInfo
25from little_loops.logger import Logger, format_duration
26from little_loops.parallel.git_lock import GitLock
27from little_loops.parallel.merge_coordinator import MergeCoordinator
28from little_loops.parallel.overlap_detector import OverlapDetector
29from little_loops.parallel.priority_queue import IssuePriorityQueue
30from little_loops.parallel.types import (
31 OrchestratorState,
32 ParallelConfig,
33 PendingWorktreeInfo,
34 WorkerResult,
35)
36from little_loops.parallel.worker_pool import WorkerPool
37from little_loops.session_log import append_session_log_entry
38from little_loops.worktree_utils import _is_ll_worktree
40if TYPE_CHECKING:
41 from little_loops.config import BRConfig
44class ParallelOrchestrator:
45 """Main controller for parallel issue processing.
47 Coordinates:
48 - Issue scanning and prioritization
49 - Worker dispatch (P0 sequential, P1-P5 parallel)
50 - Merge coordination
51 - State persistence for resume capability
52 - Graceful shutdown on signals
54 Example:
55 >>> from little_loops.config import BRConfig
56 >>> from little_loops.parallel import ParallelConfig, ParallelOrchestrator
57 >>> br_config = BRConfig(Path.cwd())
58 >>> parallel_config = ParallelConfig(max_workers=2)
59 >>> orchestrator = ParallelOrchestrator(parallel_config, br_config)
60 >>> exit_code = orchestrator.run()
61 """
63 def __init__(
64 self,
65 parallel_config: ParallelConfig,
66 br_config: BRConfig,
67 repo_path: Path | None = None,
68 verbose: bool = True,
69 wave_label: str | None = None,
70 event_bus: EventBus | None = None,
71 ) -> None:
72 """Initialize the orchestrator.
74 Args:
75 parallel_config: Parallel processing configuration
76 br_config: Project configuration
77 repo_path: Path to the git repository (default: current directory)
78 verbose: Whether to output progress messages
79 wave_label: Optional label for wave-based execution (e.g., "Wave 1")
80 event_bus: Optional EventBus for emitting worker completion events
81 """
82 self.parallel_config = parallel_config
83 self.br_config = br_config
84 self.repo_path = repo_path or Path.cwd()
85 from little_loops.cli.output import use_color_enabled
87 self.logger = Logger(verbose=verbose, use_color=use_color_enabled())
88 self.wave_label = wave_label
89 self._event_bus = event_bus
90 self._execution_duration: float = 0.0
92 # Create shared git lock for serializing main repo operations
93 # This prevents index.lock race conditions between workers and merge coordinator
94 self._git_lock = GitLock(self.logger)
96 # Initialize components with shared git lock
97 self.queue = IssuePriorityQueue()
98 self.worker_pool = WorkerPool(
99 parallel_config, br_config, self.logger, self.repo_path, self._git_lock
100 )
101 self.merge_coordinator = MergeCoordinator(
102 parallel_config, self.logger, self.repo_path, self._git_lock
103 )
105 # State management
106 self.state = OrchestratorState()
107 self._state_lock = threading.Lock()
108 self._shutdown_requested = False
109 self._original_sigint: Any = None
110 self._original_sigterm: Any = None
112 # Track issue info for lifecycle completion after merge
113 self._issue_info_by_id: dict[str, IssueInfo] = {}
114 # Track interrupted issues separately from failures (ENH-036)
115 self._interrupted_issues: list[str] = []
116 # Accumulate per-issue failure reasons for state file (BUG-1383)
117 self._worker_errors: dict[str, str] = {}
118 # Track PR-ready branches when use_feature_branches=True (ENH-665)
119 self._pr_ready_branches: dict[str, str] = {} # issue_id -> branch_name
121 # Overlap detection (ENH-143)
122 self.overlap_detector: OverlapDetector | None = (
123 OverlapDetector(config=br_config.dependency_mapping)
124 if parallel_config.overlap_detection
125 else None
126 )
127 # Track deferred issues for re-check after active issues complete
128 self._deferred_issues: list[IssueInfo] = []
129 # Track last status report time for progress visibility (ENH-262)
130 self._last_status_time: float = 0.0
131 self._last_status_line: str = ""
132 # Track last state save time to throttle disk writes (ENH-485)
133 self._last_save_time: float = 0.0
135 @property
136 def execution_duration(self) -> float:
137 """Return the total execution duration in seconds."""
138 return self._execution_duration
140 def run(self) -> int:
141 """Run the parallel issue processor.
143 Returns:
144 Exit code (0 = success, 1 = failure)
145 """
146 try:
147 self._setup_signal_handlers()
148 self._ensure_gitignore_entries()
150 # Check for pending work from previous runs (unless clean start)
151 if not self.parallel_config.clean_start:
152 pending_worktrees = self._check_pending_worktrees()
154 # Handle pending worktrees based on flags
155 pending_with_work = [p for p in pending_worktrees if p.has_pending_work]
156 if pending_with_work:
157 if self.parallel_config.merge_pending:
158 self._merge_pending_worktrees(pending_worktrees)
159 elif not self.parallel_config.ignore_pending:
160 # Default behavior: just report (cleanup happens below)
161 self.logger.info(
162 "Continuing with cleanup (use --merge-pending to merge)..."
163 )
165 self._cleanup_orphaned_worktrees()
166 self._load_state()
168 if self.parallel_config.dry_run:
169 return self._dry_run()
171 return self._execute()
173 except KeyboardInterrupt:
174 self.logger.warning("Interrupted by user")
175 return 1
176 except Exception as e:
177 self.logger.error(f"Fatal error: {e}")
178 return 1
179 finally:
180 self._cleanup()
181 self._restore_signal_handlers()
183 def _setup_signal_handlers(self) -> None:
184 """Setup signal handlers for graceful shutdown."""
185 self._original_sigint = signal.signal(signal.SIGINT, self._signal_handler)
186 self._original_sigterm = signal.signal(signal.SIGTERM, self._signal_handler)
188 def _restore_signal_handlers(self) -> None:
189 """Restore original signal handlers."""
190 if self._original_sigint is not None:
191 signal.signal(signal.SIGINT, self._original_sigint)
192 if self._original_sigterm is not None:
193 signal.signal(signal.SIGTERM, self._original_sigterm)
195 def _ensure_gitignore_entries(self) -> None:
196 """Ensure .gitignore has entries for parallel processing artifacts.
198 Adds entries for:
199 - .parallel-manage-state.json (state file)
200 - .worktrees/ (git worktree directory)
202 This prevents these files from being tracked by git, which would cause
203 conflicts during merge operations (state file is continuously updated).
204 """
205 gitignore_path = self.repo_path / ".gitignore"
206 required_entries = [
207 ".parallel-manage-state.json",
208 ".worktrees/",
209 ]
211 existing_content = ""
212 if gitignore_path.exists():
213 existing_content = gitignore_path.read_text()
215 # Check which entries are missing
216 missing_entries = []
217 for entry in required_entries:
218 # Check for exact match or pattern that would cover it
219 if entry not in existing_content:
220 missing_entries.append(entry)
222 if not missing_entries:
223 return
225 # Append missing entries
226 addition = "\n# ll-parallel artifacts\n"
227 for entry in missing_entries:
228 addition += f"{entry}\n"
230 # Ensure file ends with newline before adding
231 if existing_content and not existing_content.endswith("\n"):
232 addition = "\n" + addition
234 gitignore_path.write_text(existing_content + addition)
235 self.logger.info(f"Added {len(missing_entries)} entries to .gitignore")
237 def _cleanup_orphaned_worktrees(self) -> None:
238 """Clean up worktrees from previous interrupted runs.
240 Scans the worktree base directory and removes any worktrees that are
241 not from the current session. This handles cases where a previous run
242 was interrupted (Ctrl+C) and worktrees were not cleaned up.
243 """
244 worktree_base = self.repo_path / self.parallel_config.worktree_base
245 if not worktree_base.exists():
246 return
248 # Get list of worktree directories, skipping those owned by live processes (BUG-579)
249 orphaned = []
250 for item in worktree_base.iterdir():
251 if item.is_dir() and _is_ll_worktree(item.name):
252 # Check for a .ll-session-<pid> marker left by an active orchestrator
253 owned_by_live = False
254 for marker in item.glob(".ll-session-*"):
255 try:
256 pid = int(marker.name.split("-")[-1])
257 os.kill(pid, 0) # Signal 0: check if process exists
258 owned_by_live = True
259 break
260 except (ProcessLookupError, ValueError):
261 pass
262 except PermissionError:
263 owned_by_live = True # Process exists, no permission to signal
264 break
265 if owned_by_live:
266 self.logger.info(f"Skipping {item.name}: owned by running process")
267 continue
268 orphaned.append(item)
270 if orphaned:
271 self.logger.info(f"Cleaning up {len(orphaned)} orphaned worktree(s) from previous run")
273 for worktree_path in orphaned:
274 try:
275 self._git_lock.run(
276 ["worktree", "unlock", str(worktree_path)],
277 cwd=self.repo_path,
278 timeout=10,
279 )
280 # Try git worktree remove first
281 self._git_lock.run(
282 ["worktree", "remove", "--force", str(worktree_path)],
283 cwd=self.repo_path,
284 timeout=30,
285 )
287 # If git worktree remove failed, force delete the directory
288 if worktree_path.exists():
289 shutil.rmtree(worktree_path, ignore_errors=True)
291 # Try to delete the associated branch using the actual branch name
292 branch_result = subprocess.run(
293 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
294 cwd=worktree_path,
295 capture_output=True,
296 text=True,
297 )
298 branch_name = (
299 branch_result.stdout.strip() if branch_result.returncode == 0 else None
300 )
301 if branch_name and branch_name.startswith("parallel/"):
302 self._git_lock.run(
303 ["branch", "-D", branch_name],
304 cwd=self.repo_path,
305 timeout=10,
306 )
307 except Exception as e:
308 self.logger.warning(f"Failed to clean up {worktree_path.name}: {e}")
310 # Also prune git worktree references
311 self._git_lock.run(
312 ["worktree", "prune"],
313 cwd=self.repo_path,
314 timeout=30,
315 )
317 self._prune_ghost_worktree_refs()
319 def _prune_ghost_worktree_refs(self) -> None:
320 """Prune git worktree metadata entries whose on-disk path no longer exists.
322 Handles the SIGKILL race where a worker directory was deleted before
323 git worktree prune ran, leaving .git/worktrees/<name>/ intact. The
324 next git worktree add for the same path would then fail with "already exists".
325 """
326 try:
327 result = self._git_lock.run(
328 ["worktree", "list", "--porcelain"],
329 cwd=self.repo_path,
330 timeout=30,
331 )
332 except Exception as e:
333 self.logger.warning(f"Failed to list worktrees for ghost ref scan: {e}")
334 return
336 ghost_names: list[str] = []
337 current: dict[str, str] = {}
338 for line in result.stdout.splitlines():
339 if not line:
340 if current:
341 path_str = current.get("worktree", "")
342 name = Path(path_str).name
343 if _is_ll_worktree(name) and path_str and not Path(path_str).exists():
344 ghost_names.append(name)
345 current = {}
346 continue
347 key, _, value = line.partition(" ")
348 current[key] = value
349 if current:
350 path_str = current.get("worktree", "")
351 name = Path(path_str).name
352 if _is_ll_worktree(name) and path_str and not Path(path_str).exists():
353 ghost_names.append(name)
355 if not ghost_names:
356 return
358 for name in ghost_names:
359 self.logger.info(f"Pruned ghost ref: {name}")
361 try:
362 self._git_lock.run(
363 ["worktree", "prune"],
364 cwd=self.repo_path,
365 timeout=30,
366 )
367 except Exception as e:
368 self.logger.warning(f"Failed to prune ghost worktree refs: {e}")
370 def _inspect_worktree(self, worktree_path: Path) -> PendingWorktreeInfo | None:
371 """Inspect a worktree to determine its status.
373 Args:
374 worktree_path: Path to the worktree directory
376 Returns:
377 PendingWorktreeInfo if inspection succeeded, None if failed
378 """
379 try:
380 # Read actual branch name from worktree via rev-parse
381 branch_result = subprocess.run(
382 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
383 cwd=worktree_path,
384 capture_output=True,
385 text=True,
386 )
387 if branch_result.returncode == 0:
388 branch_name = branch_result.stdout.strip()
389 else:
390 # Fall back to string derivation if rev-parse fails
391 branch_name = worktree_path.name.replace("worker-", "parallel/")
393 # Extract issue ID (e.g., bug-045 -> BUG-045)
394 # Pattern: worker-<issue-id>-<timestamp>
395 match = re.match(r"worker-([a-z]+-\d+)-\d{8}-\d{6}", worktree_path.name)
396 issue_id = match.group(1).upper() if match else worktree_path.name
398 # Check commits ahead of main
399 result = self._git_lock.run(
400 ["rev-list", "--count", f"{self.parallel_config.base_branch}..{branch_name}"],
401 cwd=self.repo_path,
402 timeout=10,
403 )
404 commits_ahead = int(result.stdout.strip()) if result.returncode == 0 else 0
406 # Check for uncommitted changes in worktree
407 result = self._git_lock.run(
408 ["status", "--porcelain"],
409 cwd=worktree_path,
410 timeout=10,
411 )
412 changed_files = []
413 has_uncommitted = False
414 if result.returncode == 0 and result.stdout.strip():
415 has_uncommitted = True
416 changed_files = [line[3:] for line in result.stdout.strip().split("\n") if line]
418 return PendingWorktreeInfo(
419 worktree_path=worktree_path,
420 branch_name=branch_name,
421 issue_id=issue_id,
422 commits_ahead=commits_ahead,
423 has_uncommitted_changes=has_uncommitted,
424 changed_files=changed_files,
425 )
426 except Exception as e:
427 self.logger.warning(f"Failed to inspect worktree {worktree_path.name}: {e}")
428 return None
430 def _check_pending_worktrees(self) -> list[PendingWorktreeInfo]:
431 """Check for pending worktrees from previous runs and report status.
433 Returns:
434 List of pending worktree information
435 """
436 worktree_base = self.repo_path / self.parallel_config.worktree_base
437 if not worktree_base.exists():
438 return []
440 # Find all worker directories
441 worktrees = [
442 item for item in worktree_base.iterdir() if item.is_dir() and _is_ll_worktree(item.name)
443 ]
445 if not worktrees:
446 return []
448 self.logger.info("Checking for pending work from previous runs...")
450 # Inspect each worktree
451 pending_info: list[PendingWorktreeInfo] = []
452 for worktree_path in worktrees:
453 info = self._inspect_worktree(worktree_path)
454 if info:
455 pending_info.append(info)
457 # Report findings
458 with_work = [p for p in pending_info if p.has_pending_work]
459 if with_work:
460 self.logger.warning(f"Found {len(with_work)} worktree(s) with pending work:")
461 for info in with_work:
462 status_parts = []
463 if info.commits_ahead > 0:
464 status_parts.append(f"{info.commits_ahead} commit(s) ahead of main")
465 if info.has_uncommitted_changes:
466 status_parts.append(f"{len(info.changed_files)} uncommitted file(s)")
467 status = ", ".join(status_parts)
468 self.logger.warning(f" - {info.worktree_path.name}: {info.issue_id} ({status})")
470 self.logger.info("")
471 self.logger.info("Options:")
472 self.logger.info(" --merge-pending Attempt to merge pending work before continuing")
473 self.logger.info(" --clean-start Remove all worktrees and start fresh")
474 self.logger.info(
475 " --ignore-pending Continue without action (worktrees will be cleaned up)"
476 )
477 elif pending_info:
478 self.logger.info(f"Found {len(pending_info)} orphaned worktree(s) with no pending work")
480 return pending_info
482 def _merge_pending_worktrees(self, pending: list[PendingWorktreeInfo]) -> None:
483 """Attempt to merge pending worktrees from previous runs.
485 Args:
486 pending: List of pending worktree information
487 """
488 with_work = [p for p in pending if p.has_pending_work]
489 if not with_work:
490 return
492 self.logger.info(f"Attempting to merge {len(with_work)} pending worktree(s)...")
494 for info in with_work:
495 try:
496 # If there are uncommitted changes, commit them first
497 if info.has_uncommitted_changes:
498 self.logger.info(f" Committing uncommitted changes in {info.issue_id}...")
499 self._git_lock.run(
500 ["add", "-A"],
501 cwd=info.worktree_path,
502 timeout=30,
503 )
504 self._git_lock.run(
505 [
506 "commit",
507 "-m",
508 f"WIP: Auto-commit from interrupted session for {info.issue_id}",
509 ],
510 cwd=info.worktree_path,
511 timeout=30,
512 )
514 # Attempt merge
515 self.logger.info(f" Merging {info.issue_id} ({info.branch_name})...")
516 result = self._git_lock.run(
517 [
518 "merge",
519 "--no-ff",
520 info.branch_name,
521 "-m",
522 f"Merge pending work for {info.issue_id}",
523 ],
524 cwd=self.repo_path,
525 timeout=60,
526 )
528 if result.returncode == 0:
529 self.logger.success(f" Successfully merged {info.issue_id}")
530 # Clean up the worktree after successful merge
531 self._git_lock.run(
532 ["worktree", "remove", "--force", str(info.worktree_path)],
533 cwd=self.repo_path,
534 timeout=30,
535 )
536 self._git_lock.run(
537 ["branch", "-D", info.branch_name],
538 cwd=self.repo_path,
539 timeout=10,
540 )
541 else:
542 self.logger.warning(f" Failed to merge {info.issue_id}: {result.stderr}")
543 # Abort the merge if it failed
544 self._git_lock.run(
545 ["merge", "--abort"],
546 cwd=self.repo_path,
547 timeout=10,
548 )
550 except Exception as e:
551 self.logger.warning(f" Error merging {info.issue_id}: {e}")
553 def _signal_handler(self, signum: int, frame: object) -> None:
554 """Handle shutdown signals gracefully."""
555 self._shutdown_requested = True
556 # Propagate to worker pool for interrupted worker detection (ENH-036)
557 self.worker_pool.set_shutdown_requested(True)
558 self.logger.warning(f"Received signal {signum}, shutting down gracefully...")
560 def _load_state(self) -> None:
561 """Load state from file for resume capability."""
562 if self.parallel_config.clean_start:
563 self.state.started_at = datetime.now().isoformat()
564 return
565 state_file = self.repo_path / self.parallel_config.state_file
566 if not state_file.exists():
567 self.state.started_at = datetime.now().isoformat()
568 return
570 try:
571 data = json.loads(state_file.read_text())
572 self.state = OrchestratorState.from_dict(data)
574 # Restore queue state
575 self.queue.load_completed(self.state.completed_issues)
576 self.queue.load_failed(self.state.failed_issues.keys())
578 self.logger.info(
579 f"Resumed from previous state: "
580 f"{len(self.state.completed_issues)} completed, "
581 f"{len(self.state.failed_issues)} failed"
582 )
583 except Exception as e:
584 self.logger.warning(f"Could not load state: {e}")
585 self.state.started_at = datetime.now().isoformat()
587 def _save_state(self, force: bool = False) -> None:
588 """Save current state to file using an atomic write.
590 Writes are throttled to at most once every 5 seconds to reduce filesystem I/O
591 during high-frequency loop ticks (e.g., merge-waiting phase). Pass force=True
592 to bypass the throttle, e.g., on shutdown.
593 """
594 now = time.time()
595 if not force and now - self._last_save_time < 5.0:
596 return
597 self._last_save_time = now
598 with self._state_lock:
599 self.state.last_checkpoint = datetime.now().isoformat()
600 self.state.completed_issues = self.queue.completed_ids
601 self.state.failed_issues = {
602 issue_id: self._worker_errors.get(issue_id, "Failed")
603 for issue_id in self.queue.failed_ids
604 }
605 self.state.in_progress_issues = self.queue.in_progress_ids
607 state_file = self.repo_path / self.parallel_config.state_file
608 data = json.dumps(self.state.to_dict(), indent=2)
609 tmp_fd, tmp_path = tempfile.mkstemp(dir=state_file.parent, suffix=".tmp")
610 try:
611 with os.fdopen(tmp_fd, "w") as f:
612 f.write(data)
613 os.replace(tmp_path, state_file)
614 except Exception:
615 os.unlink(tmp_path)
616 raise
618 def _cleanup_state(self) -> None:
619 """Remove state file on successful completion."""
620 state_file = self.repo_path / self.parallel_config.state_file
621 if state_file.exists():
622 state_file.unlink()
624 def _dry_run(self) -> int:
625 """Preview what would be processed without executing.
627 Returns:
628 Exit code (always 0 for dry run)
629 """
630 issues = self._scan_issues()
632 self.logger.info("=" * 60)
633 self.logger.info("DRY RUN - No changes will be made")
634 self.logger.info("=" * 60)
635 self.logger.info("")
637 if not issues:
638 self.logger.info("No issues found matching criteria")
639 return 0
641 self.logger.info(f"Found {len(issues)} issues to process:")
642 self.logger.info("")
644 # Group by priority
645 by_priority: dict[str, list[IssueInfo]] = {}
646 for issue in issues:
647 by_priority.setdefault(issue.priority, []).append(issue)
649 for priority in IssuePriorityQueue.DEFAULT_PRIORITIES:
650 if priority not in by_priority:
651 continue
653 priority_issues = by_priority[priority]
654 self.logger.info(f" {priority} ({len(priority_issues)} issues):")
655 for issue in priority_issues:
656 mode = (
657 "sequential"
658 if priority == "P0" and self.parallel_config.p0_sequential
659 else "parallel"
660 )
661 self.logger.info(f" - {issue.issue_id}: {issue.title} [{mode}]")
663 self.logger.info("")
664 self.logger.info("Configuration:")
665 self.logger.info(f" Workers: {self.parallel_config.max_workers}")
666 self.logger.info(f" P0 Sequential: {self.parallel_config.p0_sequential}")
667 self.logger.info(f" Max Issues: {self.parallel_config.max_issues or 'unlimited'}")
668 self.logger.info(f" Command Prefix: {self.parallel_config.command_prefix}")
670 return 0
672 def _maybe_report_status(self) -> None:
673 """Report status if enough time has elapsed since last report.
675 Reports every 5 seconds during active processing for progress visibility (ENH-262).
676 Suppresses duplicate lines when nothing has changed.
677 """
678 now = time.time()
679 # Report every 5 seconds
680 if now - self._last_status_time < 5.0:
681 return
683 self._last_status_time = now
685 # Build status line
686 parts = []
688 # Add wave label if present
689 if self.wave_label:
690 parts.append(f"{self.wave_label}")
692 # Get queue counts
693 in_progress = len(self.queue.in_progress_ids)
694 completed = self.queue.completed_count
695 failed = self.queue.failed_count
696 pending_merge = self.merge_coordinator.pending_count
698 parts.append(f"Active: {in_progress}")
699 parts.append(f"Done: {completed}")
700 if failed > 0:
701 parts.append(f"Failed: {failed}")
702 if pending_merge > 0:
703 parts.append(f"Merging: {pending_merge}")
705 # Build status line
706 status = " | ".join(parts)
708 # Get active worker stages
709 active_stages = self.worker_pool.get_active_stages()
711 # Add worker details if any are active
712 if active_stages:
713 # Group by stage
714 by_stage: dict[str, list[str]] = {}
715 for issue_id, worker_stage in active_stages.items():
716 stage_name = worker_stage.value.title()
717 by_stage.setdefault(stage_name, []).append(issue_id)
719 stage_parts = []
720 for stage_name in ["Validating", "Implementing", "Verifying", "Merging"]:
721 if stage_name in by_stage:
722 issue_ids = ", ".join(by_stage[stage_name])
723 stage_parts.append(f"{stage_name}: [{issue_ids}]")
725 if stage_parts:
726 status += " | " + " | ".join(stage_parts)
728 # Skip if nothing changed since last report
729 if status == self._last_status_line:
730 return
731 self._last_status_line = status
733 # Log with gray color to distinguish from normal logs
734 self.logger.debug(status)
736 def _execute(self) -> int:
737 """Execute parallel issue processing.
739 Returns:
740 Exit code (0 = success, 1 = failure)
741 """
742 start_time = time.time()
744 # Scan and queue issues
745 issues = self._scan_issues()
746 if not issues:
747 self.logger.info("No issues to process")
748 return 0
750 # Store issue info for lifecycle completion after merge
751 for issue in issues:
752 self._issue_info_by_id[issue.issue_id] = issue
754 added = self.queue.add_many(issues)
755 self.logger.info(f"Queued {added} issues for processing")
757 # Start components
758 self.worker_pool.start()
759 self.merge_coordinator.start()
761 # Process issues
762 issues_processed = 0
763 max_issues = self.parallel_config.max_issues or float("inf")
765 while not self._shutdown_requested:
766 # Check if done
767 if self.queue.empty() and self.worker_pool.active_count == 0:
768 # Wait for pending merges
769 if self.merge_coordinator.pending_count == 0:
770 break
772 # Check max issues limit
773 if issues_processed >= max_issues:
774 self.logger.info(f"Reached max issues limit ({max_issues})")
775 break
777 # Get next issue if workers available
778 if self.worker_pool.active_count < self.parallel_config.max_workers:
779 queued = self.queue.get(block=False)
780 if queued:
781 issue = queued.issue_info
783 # P0 sequential processing
784 if issue.priority == "P0" and self.parallel_config.p0_sequential:
785 self._process_sequential(issue)
786 else:
787 self._process_parallel(issue)
789 issues_processed += 1
791 # Save state periodically
792 self._save_state()
794 # Report status periodically for progress visibility (ENH-262)
795 self._maybe_report_status()
797 # Small sleep to prevent busy loop
798 time.sleep(0.1)
800 # Wait for completion
801 self._wait_for_completion()
803 # Report results
804 self._report_results(start_time)
806 # Cleanup state on success
807 if not self._shutdown_requested and self.queue.failed_count == 0:
808 self._cleanup_state()
810 return 0 if self.queue.failed_count == 0 else 1
812 def _scan_issues(self) -> list[IssueInfo]:
813 """Scan for issues matching criteria.
815 Returns:
816 List of issues sorted by priority
817 """
818 # Combine skip_ids from state and config
819 skip_ids = set(self.state.completed_issues) | set(self.state.failed_issues.keys())
820 if self.parallel_config.skip_ids:
821 skip_ids |= self.parallel_config.skip_ids
823 issues = IssuePriorityQueue.scan_issues(
824 self.br_config,
825 priority_filter=list(self.parallel_config.priority_filter),
826 skip_ids=skip_ids,
827 only_ids=self.parallel_config.only_ids,
828 type_prefixes=self.parallel_config.type_prefixes,
829 label_filter=self.parallel_config.label_filter,
830 )
832 # Apply max issues limit
833 if self.parallel_config.max_issues > 0:
834 issues = issues[: self.parallel_config.max_issues]
836 return issues
838 def _process_sequential(self, issue: IssueInfo) -> None:
839 """Process an issue sequentially (blocking).
841 Args:
842 issue: Issue to process
843 """
844 self.logger.info(f"Processing {issue.issue_id} sequentially (P0)")
846 # Wait for any parallel work to finish
847 while self.worker_pool.active_count > 0:
848 time.sleep(0.5)
850 # Process in main repo (no worktree isolation needed)
851 # Note: No callback here - _merge_sequential handles the result explicitly
852 # to avoid double-processing (callback would also queue merge/close)
853 future = self.worker_pool.submit(issue)
855 # Wait for completion
856 try:
857 result = future.result(timeout=self.parallel_config.timeout_per_issue)
858 if result.success:
859 # Merge immediately for P0
860 self._merge_sequential(result)
861 except Exception as e:
862 self.logger.error(f"Sequential processing failed: {e}")
863 self.queue.mark_failed(issue.issue_id)
865 def _process_parallel(self, issue: IssueInfo) -> None:
866 """Process an issue in parallel (non-blocking).
868 Args:
869 issue: Issue to process
870 """
871 # Check for overlaps if enabled (ENH-143)
872 if self.overlap_detector:
873 overlap = self.overlap_detector.check_overlap(issue)
874 if overlap:
875 if self.parallel_config.serialize_overlapping:
876 self.logger.warning(
877 f"Deferring {issue.issue_id} - overlaps with {overlap.overlapping_issues}"
878 )
879 # Track for re-check when active issues complete
880 self._deferred_issues.append(issue)
881 return
882 else:
883 self.logger.warning(
884 f"Warning: {issue.issue_id} may conflict with {overlap.overlapping_issues}"
885 )
887 # Register as active before dispatch
888 self.overlap_detector.register_issue(issue)
890 self.logger.info(f"Dispatching {issue.issue_id} to worker pool")
891 self.worker_pool.submit(issue, self._on_worker_complete)
893 def _on_worker_complete(self, result: WorkerResult) -> None:
894 """Callback when a worker completes.
896 Args:
897 result: Result from the worker
898 """
899 # Unregister from overlap tracking (ENH-143)
900 if self.overlap_detector:
901 self.overlap_detector.unregister_issue(result.issue_id)
902 # Re-queue deferred issues that were waiting on this one
903 self._requeue_deferred_issues()
905 # Handle interrupted workers (not counted as failed) - ENH-036
906 if result.interrupted:
907 self.logger.info(f"{result.issue_id} was interrupted during shutdown (can retry)")
908 self._interrupted_issues.append(result.issue_id)
909 # Don't mark as failed - they can be retried on next run
910 return
912 # Handle issue closure (no merge needed)
913 if result.should_close:
914 # Lazy import to avoid circular dependency
915 from little_loops.issue_lifecycle import close_issue
917 self.logger.info(f"{result.issue_id} should be closed: {result.close_status}")
918 info = self._issue_info_by_id.get(result.issue_id)
919 if info:
920 if close_issue(
921 info,
922 self.br_config,
923 self.logger,
924 result.close_reason,
925 result.close_status,
926 interceptors=None,
927 ):
928 self.queue.mark_completed(result.issue_id)
929 else:
930 self._worker_errors[result.issue_id] = (
931 f"Close failed: {result.close_reason or 'close error'}"
932 )
933 self.queue.mark_failed(result.issue_id)
934 else:
935 self.logger.warning(f"No issue info found for {result.issue_id}")
936 self._worker_errors[result.issue_id] = "Close failed: no issue info"
937 self.queue.mark_failed(result.issue_id)
938 elif result.success:
939 self.logger.success(
940 f"{result.issue_id} completed in {format_duration(result.duration)}"
941 )
942 if result.was_corrected:
943 self.logger.info(f"{result.issue_id} was auto-corrected during validation")
944 # Log and store corrections for pattern analysis (ENH-010)
945 for correction in result.corrections:
946 self.logger.info(f" Correction: {correction}")
947 if result.corrections:
948 with self._state_lock:
949 self.state.corrections[result.issue_id] = result.corrections
950 if self.parallel_config.use_feature_branches:
951 # Feature branch mode: skip auto-merge, branch stays alive for a PR (ENH-665)
952 self.logger.info(f"{result.issue_id}: feature branch ready — {result.branch_name}")
953 self.queue.mark_completed(result.issue_id)
954 self._complete_issue_lifecycle_if_needed(result.issue_id)
955 with self._state_lock:
956 self._pr_ready_branches[result.issue_id] = result.branch_name
957 else:
958 self.merge_coordinator.queue_merge(result)
959 # Wait for merge to complete before returning from callback.
960 # This prevents dispatch of next worker while merge is in progress,
961 # avoiding race conditions between worktree creation and merge ops.
962 # (BUG-140: Race condition between worktree creation and merge)
963 self.merge_coordinator.wait_for_completion(timeout=120)
964 if result.issue_id in self.merge_coordinator.merged_ids:
965 self.queue.mark_completed(result.issue_id)
966 self._complete_issue_lifecycle_if_needed(result.issue_id)
967 else:
968 self._worker_errors[result.issue_id] = (
969 f"Merge failed: {result.error or 'merge error'}"
970 )
971 self.queue.mark_failed(result.issue_id)
972 else:
973 self.logger.error(f"{result.issue_id} failed: {result.error}")
974 self._worker_errors[result.issue_id] = result.error or "Failed"
975 self.queue.mark_failed(result.issue_id)
977 # Update timing
978 with self._state_lock:
979 self.state.timing[result.issue_id] = {
980 "total": result.duration,
981 }
983 # Clean up stage tracking after callback completes (ENH-262)
984 # Delay briefly so status reporter can show completion
985 self.worker_pool.remove_worker_stage(result.issue_id)
987 # Emit worker completion event for extensions (ENH-921)
988 if self._event_bus:
989 self._event_bus.emit(
990 {
991 "event": "parallel.worker_completed",
992 "ts": datetime.now(UTC).isoformat(),
993 "issue_id": result.issue_id,
994 "worker_name": result.worktree_path.name,
995 "status": "success" if result.success else "failure",
996 "duration_seconds": result.duration,
997 }
998 )
1000 def _requeue_deferred_issues(self) -> None:
1001 """Re-queue deferred issues that no longer have overlaps (ENH-143)."""
1002 if not self._deferred_issues:
1003 return
1005 # Check each deferred issue for remaining overlaps
1006 still_deferred = []
1007 for issue in self._deferred_issues:
1008 if self.overlap_detector:
1009 overlap = self.overlap_detector.check_overlap(issue)
1010 if overlap:
1011 # Still has overlaps, keep deferred
1012 still_deferred.append(issue)
1013 else:
1014 # No more overlaps, add back to queue
1015 self.logger.info(f"Re-queuing {issue.issue_id} - no longer overlapping")
1016 self.queue.add(issue)
1018 self._deferred_issues = still_deferred
1020 def _merge_sequential(self, result: WorkerResult) -> None:
1021 """Merge a sequential (P0) result immediately.
1023 Args:
1024 result: Result to merge
1025 """
1026 # Handle closure for sequential issues
1027 if result.should_close:
1028 # Lazy import to avoid circular dependency
1029 from little_loops.issue_lifecycle import close_issue
1031 info = self._issue_info_by_id.get(result.issue_id)
1032 if info and close_issue(
1033 info,
1034 self.br_config,
1035 self.logger,
1036 result.close_reason,
1037 result.close_status,
1038 interceptors=None,
1039 ):
1040 self.queue.mark_completed(result.issue_id)
1041 else:
1042 self._worker_errors[result.issue_id] = (
1043 f"Close failed: {result.close_reason or 'close error'}"
1044 )
1045 self.queue.mark_failed(result.issue_id)
1046 return
1048 self.merge_coordinator.queue_merge(result)
1049 # Wait for this specific merge
1050 self.merge_coordinator.wait_for_completion(timeout=60)
1052 if result.issue_id in self.merge_coordinator.merged_ids:
1053 self.queue.mark_completed(result.issue_id)
1054 self._complete_issue_lifecycle_if_needed(result.issue_id)
1055 else:
1056 self._worker_errors[result.issue_id] = f"Merge failed: {result.error or 'merge error'}"
1057 self.queue.mark_failed(result.issue_id)
1059 def _wait_for_completion(self) -> None:
1060 """Wait for all workers and merges to complete."""
1061 self.logger.info("Waiting for workers to complete...")
1063 # Calculate timeout
1064 if self.parallel_config.orchestrator_timeout > 0:
1065 timeout = self.parallel_config.orchestrator_timeout
1066 else:
1067 timeout = self.parallel_config.timeout_per_issue * self.parallel_config.max_workers
1069 start = time.time()
1070 while self.worker_pool.active_count > 0:
1071 if time.time() - start > timeout:
1072 self.logger.warning(f"Timeout waiting for workers after {timeout}s")
1073 self.worker_pool.terminate_all_processes()
1074 break
1075 time.sleep(1.0)
1077 # Wait for merges
1078 self.logger.info("Waiting for pending merges...")
1079 self.merge_coordinator.wait_for_completion(timeout=120)
1081 # Update queue with merge results and complete lifecycle
1082 for issue_id in self.merge_coordinator.merged_ids:
1083 self.queue.mark_completed(issue_id)
1084 self._complete_issue_lifecycle_if_needed(issue_id)
1086 for issue_id, reason in self.merge_coordinator.failed_merges.items():
1087 self._worker_errors[issue_id] = reason or "Merge failed"
1088 self.queue.mark_failed(issue_id)
1090 def _report_results(self, start_time: float) -> None:
1091 """Report processing results.
1093 Args:
1094 start_time: When processing started
1095 """
1096 total_time = time.time() - start_time
1097 self._execution_duration = total_time
1099 self.logger.info("")
1100 self.logger.info("=" * 60)
1101 if self.wave_label:
1102 self.logger.info(f"{self.wave_label.upper()} PROCESSING COMPLETE")
1103 else:
1104 self.logger.info("PARALLEL ISSUE PROCESSING COMPLETE")
1105 self.logger.info("=" * 60)
1106 self.logger.info("")
1107 self.logger.timing(f"Total time: {format_duration(total_time)}")
1108 self.logger.info(f"Completed: {self.queue.completed_count}")
1109 self.logger.info(f"Failed: {self.queue.failed_count}")
1110 if self._interrupted_issues:
1111 self.logger.info(f"Interrupted: {len(self._interrupted_issues)}")
1113 with self._state_lock:
1114 timing_snapshot = dict(self.state.timing)
1115 corrections_snapshot = dict(self.state.corrections)
1117 if self.queue.completed_count > 0:
1118 total_issue_time = sum(t.get("total", 0) for t in timing_snapshot.values())
1119 if total_issue_time > 0:
1120 speedup = total_issue_time / total_time
1121 self.logger.info(f"Estimated speedup: {speedup:.2f}x")
1123 if self.queue.failed_ids:
1124 self.logger.info("")
1125 self.logger.warning("Failed issues:")
1126 for issue_id in self.queue.failed_ids:
1127 self.logger.warning(f" - {issue_id}")
1129 # Report interrupted issues separately (ENH-036)
1130 if self._interrupted_issues:
1131 self.logger.info("")
1132 self.logger.info(f"Interrupted: {len(self._interrupted_issues)} (can retry)")
1133 for issue_id in self._interrupted_issues:
1134 self.logger.info(f" - {issue_id}")
1136 # Report PR-ready branches when use_feature_branches=True (ENH-665)
1137 if self._pr_ready_branches:
1138 self.logger.info("")
1139 self.logger.info(f"PR-ready: {len(self._pr_ready_branches)} branch(es)")
1140 for issue_id, branch in self._pr_ready_branches.items():
1141 self.logger.info(f" - {issue_id}: {branch}")
1143 # Report correction statistics for quality tracking (ENH-010)
1144 if corrections_snapshot:
1145 total_corrected = len(corrections_snapshot)
1146 total_issues = self.queue.completed_count + self.queue.failed_count
1147 correction_rate = (total_corrected / total_issues * 100) if total_issues > 0 else 0
1148 self.logger.info("")
1149 self.logger.info(
1150 f"Auto-corrections: {total_corrected}/{total_issues} ({correction_rate:.1f}%)"
1151 )
1153 # Group corrections by category (ENH-010 fourth fix)
1154 from collections import Counter, defaultdict
1156 all_corrections: list[str] = []
1157 by_category: dict[str, int] = defaultdict(int)
1158 for corrections in corrections_snapshot.values():
1159 all_corrections.extend(corrections)
1160 for correction in corrections:
1161 # Extract category from [category] prefix if present
1162 if correction.startswith("[") and "]" in correction:
1163 category = correction[1 : correction.index("]")]
1164 by_category[category] += 1
1165 else:
1166 by_category["uncategorized"] += 1
1168 # Log corrections by type/category
1169 if by_category:
1170 self.logger.info("Corrections by type:")
1171 for category, count in sorted(by_category.items(), key=lambda x: -x[1]):
1172 self.logger.info(f" - {category}: {count}")
1174 # Log most common individual corrections
1175 if all_corrections:
1176 common = Counter(all_corrections).most_common(3)
1177 self.logger.info("Most common corrections:")
1178 for correction, count in common:
1179 # Truncate long correction descriptions
1180 display = correction[:60] + "..." if len(correction) > 60 else correction
1181 self.logger.info(f" - {display}: {count}")
1183 # Report stash pop warnings (local changes need manual recovery)
1184 stash_warnings = self.merge_coordinator.stash_pop_failures
1185 if stash_warnings:
1186 self.logger.info("")
1187 self.logger.warning("Stash recovery warnings (local changes need manual restoration):")
1188 for issue_id, message in stash_warnings.items():
1189 self.logger.warning(f" - {issue_id}: {message}")
1190 self.logger.warning("")
1191 self.logger.warning(
1192 "To recover: Run 'git stash list' to find your changes, "
1193 "then 'git stash pop' or 'git stash apply stash@{N}'"
1194 )
1196 def _complete_issue_lifecycle_if_needed(self, issue_id: str) -> bool:
1197 """Complete issue lifecycle by writing ``status: done`` to frontmatter.
1199 Args:
1200 issue_id: ID of the issue to complete
1202 Returns:
1203 True if lifecycle was completed (or already complete), False on error
1204 """
1205 info = self._issue_info_by_id.get(issue_id)
1206 if not info:
1207 self.logger.warning(f"No issue info found for {issue_id}")
1208 return False
1210 original_path = info.path
1212 if not original_path.exists():
1213 return True
1215 self.logger.info(f"Completing lifecycle for {issue_id} (frontmatter status update)")
1217 try:
1218 content = original_path.read_text()
1220 # Add resolution section if not already present
1221 if "## Resolution" not in content:
1222 action = self.br_config.get_category_action(info.issue_type)
1223 resolution = f"""
1225---
1227## Resolution
1229- **Action**: {action}
1230- **Completed**: {datetime.now(UTC).strftime("%Y-%m-%d")}
1231- **Status**: Completed (parallel merge fallback)
1232- **Implementation**: Merged from parallel worker branch
1234### Changes Made
1235- See git history for implementation details
1237### Verification Results
1238- Work verification passed before merge
1240### Commits
1241- See `git log --oneline` for merge commit details
1242"""
1243 content += resolution
1245 content = update_frontmatter(
1246 content,
1247 {
1248 "status": "done",
1249 "completed_at": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"),
1250 },
1251 )
1252 original_path.write_text(content)
1253 append_session_log_entry(original_path, "ll-parallel")
1255 # Stage and commit
1256 self._git_lock.run(
1257 ["add", "-A"],
1258 cwd=self.repo_path,
1259 )
1261 action = self.br_config.get_category_action(info.issue_type)
1262 commit_msg = f"""{action}({info.issue_type}): complete {issue_id} lifecycle
1264Parallel merge fallback - status: done written to frontmatter.
1266Issue: {issue_id}
1267Type: {info.issue_type}
1268Title: {info.title}
1269"""
1270 commit_result = self._git_lock.run(
1271 ["commit", "-m", commit_msg],
1272 cwd=self.repo_path,
1273 )
1275 if commit_result.returncode != 0:
1276 if "nothing to commit" not in commit_result.stdout.lower():
1277 self.logger.warning(f"git commit failed: {commit_result.stderr}")
1278 else:
1279 commit_hash_match = re.search(r"\[[\w-]+\s+([a-f0-9]+)\]", commit_result.stdout)
1280 if commit_hash_match:
1281 self.logger.success(
1282 f"Completed lifecycle for {issue_id}: {commit_hash_match.group(1)}"
1283 )
1284 else:
1285 self.logger.success(f"Completed lifecycle for {issue_id}")
1287 return True
1289 except Exception as e:
1290 self.logger.error(f"Failed to complete lifecycle for {issue_id}: {e}")
1291 return False
1293 def _cleanup(self) -> None:
1294 """Clean up resources."""
1295 self.logger.info("Cleaning up...")
1297 # Save final state (force=True bypasses throttle to ensure shutdown state is persisted)
1298 self._save_state(force=True)
1300 # Shutdown components
1301 self.worker_pool.shutdown(wait=True)
1302 self.merge_coordinator.shutdown(wait=True, timeout=30)
1304 # Flush transports regardless of interrupt state so events are not lost.
1305 if self._event_bus is not None:
1306 self._event_bus.close_transports()
1308 # Clean up worktrees if not interrupted
1309 if not self._shutdown_requested:
1310 self.worker_pool.cleanup_all_worktrees()