Coverage for little_loops / parallel / orchestrator.py: 84%

643 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Main orchestrator for parallel issue processing. 

2 

3Coordinates the priority queue, worker pool, and merge coordinator to process 

4multiple issues concurrently. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import os 

11import re 

12import shutil 

13import signal 

14import subprocess 

15import tempfile 

16import threading 

17import time 

18from datetime import UTC, datetime 

19from pathlib import Path 

20from typing import TYPE_CHECKING, Any 

21 

22from little_loops.events import EventBus 

23from little_loops.frontmatter import update_frontmatter 

24from little_loops.issue_parser import IssueInfo 

25from little_loops.logger import Logger, format_duration 

26from little_loops.parallel.git_lock import GitLock 

27from little_loops.parallel.merge_coordinator import MergeCoordinator 

28from little_loops.parallel.overlap_detector import OverlapDetector 

29from little_loops.parallel.priority_queue import IssuePriorityQueue 

30from little_loops.parallel.types import ( 

31 OrchestratorState, 

32 ParallelConfig, 

33 PendingWorktreeInfo, 

34 WorkerResult, 

35) 

36from little_loops.parallel.worker_pool import WorkerPool 

37from little_loops.session_log import append_session_log_entry 

38from little_loops.worktree_utils import _is_ll_worktree 

39 

40if TYPE_CHECKING: 

41 from little_loops.config import BRConfig 

42 

43 

44class ParallelOrchestrator: 

45 """Main controller for parallel issue processing. 

46 

47 Coordinates: 

48 - Issue scanning and prioritization 

49 - Worker dispatch (P0 sequential, P1-P5 parallel) 

50 - Merge coordination 

51 - State persistence for resume capability 

52 - Graceful shutdown on signals 

53 

54 Example: 

55 >>> from little_loops.config import BRConfig 

56 >>> from little_loops.parallel import ParallelConfig, ParallelOrchestrator 

57 >>> br_config = BRConfig(Path.cwd()) 

58 >>> parallel_config = ParallelConfig(max_workers=2) 

59 >>> orchestrator = ParallelOrchestrator(parallel_config, br_config) 

60 >>> exit_code = orchestrator.run() 

61 """ 

62 

63 def __init__( 

64 self, 

65 parallel_config: ParallelConfig, 

66 br_config: BRConfig, 

67 repo_path: Path | None = None, 

68 verbose: bool = True, 

69 wave_label: str | None = None, 

70 event_bus: EventBus | None = None, 

71 ) -> None: 

72 """Initialize the orchestrator. 

73 

74 Args: 

75 parallel_config: Parallel processing configuration 

76 br_config: Project configuration 

77 repo_path: Path to the git repository (default: current directory) 

78 verbose: Whether to output progress messages 

79 wave_label: Optional label for wave-based execution (e.g., "Wave 1") 

80 event_bus: Optional EventBus for emitting worker completion events 

81 """ 

82 self.parallel_config = parallel_config 

83 self.br_config = br_config 

84 self.repo_path = repo_path or Path.cwd() 

85 from little_loops.cli.output import use_color_enabled 

86 

87 self.logger = Logger(verbose=verbose, use_color=use_color_enabled()) 

88 self.wave_label = wave_label 

89 self._event_bus = event_bus 

90 self._execution_duration: float = 0.0 

91 

92 # Create shared git lock for serializing main repo operations 

93 # This prevents index.lock race conditions between workers and merge coordinator 

94 self._git_lock = GitLock(self.logger) 

95 

96 # Initialize components with shared git lock 

97 self.queue = IssuePriorityQueue() 

98 self.worker_pool = WorkerPool( 

99 parallel_config, br_config, self.logger, self.repo_path, self._git_lock 

100 ) 

101 self.merge_coordinator = MergeCoordinator( 

102 parallel_config, self.logger, self.repo_path, self._git_lock 

103 ) 

104 

105 # State management 

106 self.state = OrchestratorState() 

107 self._state_lock = threading.Lock() 

108 self._shutdown_requested = False 

109 self._original_sigint: Any = None 

110 self._original_sigterm: Any = None 

111 

112 # Track issue info for lifecycle completion after merge 

113 self._issue_info_by_id: dict[str, IssueInfo] = {} 

114 # Track interrupted issues separately from failures (ENH-036) 

115 self._interrupted_issues: list[str] = [] 

116 # Accumulate per-issue failure reasons for state file (BUG-1383) 

117 self._worker_errors: dict[str, str] = {} 

118 # Track PR-ready branches when use_feature_branches=True (ENH-665) 

119 self._pr_ready_branches: dict[str, str] = {} # issue_id -> branch_name 

120 

121 # Overlap detection (ENH-143) 

122 self.overlap_detector: OverlapDetector | None = ( 

123 OverlapDetector(config=br_config.dependency_mapping) 

124 if parallel_config.overlap_detection 

125 else None 

126 ) 

127 # Track deferred issues for re-check after active issues complete 

128 self._deferred_issues: list[IssueInfo] = [] 

129 # Track last status report time for progress visibility (ENH-262) 

130 self._last_status_time: float = 0.0 

131 self._last_status_line: str = "" 

132 # Track last state save time to throttle disk writes (ENH-485) 

133 self._last_save_time: float = 0.0 

134 

135 @property 

136 def execution_duration(self) -> float: 

137 """Return the total execution duration in seconds.""" 

138 return self._execution_duration 

139 

140 def run(self) -> int: 

141 """Run the parallel issue processor. 

142 

143 Returns: 

144 Exit code (0 = success, 1 = failure) 

145 """ 

146 try: 

147 self._setup_signal_handlers() 

148 self._ensure_gitignore_entries() 

149 

150 # Check for pending work from previous runs (unless clean start) 

151 if not self.parallel_config.clean_start: 

152 pending_worktrees = self._check_pending_worktrees() 

153 

154 # Handle pending worktrees based on flags 

155 pending_with_work = [p for p in pending_worktrees if p.has_pending_work] 

156 if pending_with_work: 

157 if self.parallel_config.merge_pending: 

158 self._merge_pending_worktrees(pending_worktrees) 

159 elif not self.parallel_config.ignore_pending: 

160 # Default behavior: just report (cleanup happens below) 

161 self.logger.info( 

162 "Continuing with cleanup (use --merge-pending to merge)..." 

163 ) 

164 

165 self._cleanup_orphaned_worktrees() 

166 self._load_state() 

167 

168 if self.parallel_config.dry_run: 

169 return self._dry_run() 

170 

171 return self._execute() 

172 

173 except KeyboardInterrupt: 

174 self.logger.warning("Interrupted by user") 

175 return 1 

176 except Exception as e: 

177 self.logger.error(f"Fatal error: {e}") 

178 return 1 

179 finally: 

180 self._cleanup() 

181 self._restore_signal_handlers() 

182 

183 def _setup_signal_handlers(self) -> None: 

184 """Setup signal handlers for graceful shutdown.""" 

185 self._original_sigint = signal.signal(signal.SIGINT, self._signal_handler) 

186 self._original_sigterm = signal.signal(signal.SIGTERM, self._signal_handler) 

187 

188 def _restore_signal_handlers(self) -> None: 

189 """Restore original signal handlers.""" 

190 if self._original_sigint is not None: 

191 signal.signal(signal.SIGINT, self._original_sigint) 

192 if self._original_sigterm is not None: 

193 signal.signal(signal.SIGTERM, self._original_sigterm) 

194 

195 def _ensure_gitignore_entries(self) -> None: 

196 """Ensure .gitignore has entries for parallel processing artifacts. 

197 

198 Adds entries for: 

199 - .parallel-manage-state.json (state file) 

200 - .worktrees/ (git worktree directory) 

201 

202 This prevents these files from being tracked by git, which would cause 

203 conflicts during merge operations (state file is continuously updated). 

204 """ 

205 gitignore_path = self.repo_path / ".gitignore" 

206 required_entries = [ 

207 ".parallel-manage-state.json", 

208 ".worktrees/", 

209 ] 

210 

211 existing_content = "" 

212 if gitignore_path.exists(): 

213 existing_content = gitignore_path.read_text() 

214 

215 # Check which entries are missing 

216 missing_entries = [] 

217 for entry in required_entries: 

218 # Check for exact match or pattern that would cover it 

219 if entry not in existing_content: 

220 missing_entries.append(entry) 

221 

222 if not missing_entries: 

223 return 

224 

225 # Append missing entries 

226 addition = "\n# ll-parallel artifacts\n" 

227 for entry in missing_entries: 

228 addition += f"{entry}\n" 

229 

230 # Ensure file ends with newline before adding 

231 if existing_content and not existing_content.endswith("\n"): 

232 addition = "\n" + addition 

233 

234 gitignore_path.write_text(existing_content + addition) 

235 self.logger.info(f"Added {len(missing_entries)} entries to .gitignore") 

236 

237 def _cleanup_orphaned_worktrees(self) -> None: 

238 """Clean up worktrees from previous interrupted runs. 

239 

240 Scans the worktree base directory and removes any worktrees that are 

241 not from the current session. This handles cases where a previous run 

242 was interrupted (Ctrl+C) and worktrees were not cleaned up. 

243 """ 

244 worktree_base = self.repo_path / self.parallel_config.worktree_base 

245 if not worktree_base.exists(): 

246 return 

247 

248 # Get list of worktree directories, skipping those owned by live processes (BUG-579) 

249 orphaned = [] 

250 for item in worktree_base.iterdir(): 

251 if item.is_dir() and _is_ll_worktree(item.name): 

252 # Check for a .ll-session-<pid> marker left by an active orchestrator 

253 owned_by_live = False 

254 for marker in item.glob(".ll-session-*"): 

255 try: 

256 pid = int(marker.name.split("-")[-1]) 

257 os.kill(pid, 0) # Signal 0: check if process exists 

258 owned_by_live = True 

259 break 

260 except (ProcessLookupError, ValueError): 

261 pass 

262 except PermissionError: 

263 owned_by_live = True # Process exists, no permission to signal 

264 break 

265 if owned_by_live: 

266 self.logger.info(f"Skipping {item.name}: owned by running process") 

267 continue 

268 orphaned.append(item) 

269 

270 if orphaned: 

271 self.logger.info(f"Cleaning up {len(orphaned)} orphaned worktree(s) from previous run") 

272 

273 for worktree_path in orphaned: 

274 try: 

275 self._git_lock.run( 

276 ["worktree", "unlock", str(worktree_path)], 

277 cwd=self.repo_path, 

278 timeout=10, 

279 ) 

280 # Try git worktree remove first 

281 self._git_lock.run( 

282 ["worktree", "remove", "--force", str(worktree_path)], 

283 cwd=self.repo_path, 

284 timeout=30, 

285 ) 

286 

287 # If git worktree remove failed, force delete the directory 

288 if worktree_path.exists(): 

289 shutil.rmtree(worktree_path, ignore_errors=True) 

290 

291 # Try to delete the associated branch using the actual branch name 

292 branch_result = subprocess.run( 

293 ["git", "rev-parse", "--abbrev-ref", "HEAD"], 

294 cwd=worktree_path, 

295 capture_output=True, 

296 text=True, 

297 ) 

298 branch_name = ( 

299 branch_result.stdout.strip() if branch_result.returncode == 0 else None 

300 ) 

301 if branch_name and branch_name.startswith("parallel/"): 

302 self._git_lock.run( 

303 ["branch", "-D", branch_name], 

304 cwd=self.repo_path, 

305 timeout=10, 

306 ) 

307 except Exception as e: 

308 self.logger.warning(f"Failed to clean up {worktree_path.name}: {e}") 

309 

310 # Also prune git worktree references 

311 self._git_lock.run( 

312 ["worktree", "prune"], 

313 cwd=self.repo_path, 

314 timeout=30, 

315 ) 

316 

317 self._prune_ghost_worktree_refs() 

318 

319 def _prune_ghost_worktree_refs(self) -> None: 

320 """Prune git worktree metadata entries whose on-disk path no longer exists. 

321 

322 Handles the SIGKILL race where a worker directory was deleted before 

323 git worktree prune ran, leaving .git/worktrees/<name>/ intact. The 

324 next git worktree add for the same path would then fail with "already exists". 

325 """ 

326 try: 

327 result = self._git_lock.run( 

328 ["worktree", "list", "--porcelain"], 

329 cwd=self.repo_path, 

330 timeout=30, 

331 ) 

332 except Exception as e: 

333 self.logger.warning(f"Failed to list worktrees for ghost ref scan: {e}") 

334 return 

335 

336 ghost_names: list[str] = [] 

337 current: dict[str, str] = {} 

338 for line in result.stdout.splitlines(): 

339 if not line: 

340 if current: 

341 path_str = current.get("worktree", "") 

342 name = Path(path_str).name 

343 if _is_ll_worktree(name) and path_str and not Path(path_str).exists(): 

344 ghost_names.append(name) 

345 current = {} 

346 continue 

347 key, _, value = line.partition(" ") 

348 current[key] = value 

349 if current: 

350 path_str = current.get("worktree", "") 

351 name = Path(path_str).name 

352 if _is_ll_worktree(name) and path_str and not Path(path_str).exists(): 

353 ghost_names.append(name) 

354 

355 if not ghost_names: 

356 return 

357 

358 for name in ghost_names: 

359 self.logger.info(f"Pruned ghost ref: {name}") 

360 

361 try: 

362 self._git_lock.run( 

363 ["worktree", "prune"], 

364 cwd=self.repo_path, 

365 timeout=30, 

366 ) 

367 except Exception as e: 

368 self.logger.warning(f"Failed to prune ghost worktree refs: {e}") 

369 

370 def _inspect_worktree(self, worktree_path: Path) -> PendingWorktreeInfo | None: 

371 """Inspect a worktree to determine its status. 

372 

373 Args: 

374 worktree_path: Path to the worktree directory 

375 

376 Returns: 

377 PendingWorktreeInfo if inspection succeeded, None if failed 

378 """ 

379 try: 

380 # Read actual branch name from worktree via rev-parse 

381 branch_result = subprocess.run( 

382 ["git", "rev-parse", "--abbrev-ref", "HEAD"], 

383 cwd=worktree_path, 

384 capture_output=True, 

385 text=True, 

386 ) 

387 if branch_result.returncode == 0: 

388 branch_name = branch_result.stdout.strip() 

389 else: 

390 # Fall back to string derivation if rev-parse fails 

391 branch_name = worktree_path.name.replace("worker-", "parallel/") 

392 

393 # Extract issue ID (e.g., bug-045 -> BUG-045) 

394 # Pattern: worker-<issue-id>-<timestamp> 

395 match = re.match(r"worker-([a-z]+-\d+)-\d{8}-\d{6}", worktree_path.name) 

396 issue_id = match.group(1).upper() if match else worktree_path.name 

397 

398 # Check commits ahead of main 

399 result = self._git_lock.run( 

400 ["rev-list", "--count", f"{self.parallel_config.base_branch}..{branch_name}"], 

401 cwd=self.repo_path, 

402 timeout=10, 

403 ) 

404 commits_ahead = int(result.stdout.strip()) if result.returncode == 0 else 0 

405 

406 # Check for uncommitted changes in worktree 

407 result = self._git_lock.run( 

408 ["status", "--porcelain"], 

409 cwd=worktree_path, 

410 timeout=10, 

411 ) 

412 changed_files = [] 

413 has_uncommitted = False 

414 if result.returncode == 0 and result.stdout.strip(): 

415 has_uncommitted = True 

416 changed_files = [line[3:] for line in result.stdout.strip().split("\n") if line] 

417 

418 return PendingWorktreeInfo( 

419 worktree_path=worktree_path, 

420 branch_name=branch_name, 

421 issue_id=issue_id, 

422 commits_ahead=commits_ahead, 

423 has_uncommitted_changes=has_uncommitted, 

424 changed_files=changed_files, 

425 ) 

426 except Exception as e: 

427 self.logger.warning(f"Failed to inspect worktree {worktree_path.name}: {e}") 

428 return None 

429 

430 def _check_pending_worktrees(self) -> list[PendingWorktreeInfo]: 

431 """Check for pending worktrees from previous runs and report status. 

432 

433 Returns: 

434 List of pending worktree information 

435 """ 

436 worktree_base = self.repo_path / self.parallel_config.worktree_base 

437 if not worktree_base.exists(): 

438 return [] 

439 

440 # Find all worker directories 

441 worktrees = [ 

442 item for item in worktree_base.iterdir() if item.is_dir() and _is_ll_worktree(item.name) 

443 ] 

444 

445 if not worktrees: 

446 return [] 

447 

448 self.logger.info("Checking for pending work from previous runs...") 

449 

450 # Inspect each worktree 

451 pending_info: list[PendingWorktreeInfo] = [] 

452 for worktree_path in worktrees: 

453 info = self._inspect_worktree(worktree_path) 

454 if info: 

455 pending_info.append(info) 

456 

457 # Report findings 

458 with_work = [p for p in pending_info if p.has_pending_work] 

459 if with_work: 

460 self.logger.warning(f"Found {len(with_work)} worktree(s) with pending work:") 

461 for info in with_work: 

462 status_parts = [] 

463 if info.commits_ahead > 0: 

464 status_parts.append(f"{info.commits_ahead} commit(s) ahead of main") 

465 if info.has_uncommitted_changes: 

466 status_parts.append(f"{len(info.changed_files)} uncommitted file(s)") 

467 status = ", ".join(status_parts) 

468 self.logger.warning(f" - {info.worktree_path.name}: {info.issue_id} ({status})") 

469 

470 self.logger.info("") 

471 self.logger.info("Options:") 

472 self.logger.info(" --merge-pending Attempt to merge pending work before continuing") 

473 self.logger.info(" --clean-start Remove all worktrees and start fresh") 

474 self.logger.info( 

475 " --ignore-pending Continue without action (worktrees will be cleaned up)" 

476 ) 

477 elif pending_info: 

478 self.logger.info(f"Found {len(pending_info)} orphaned worktree(s) with no pending work") 

479 

480 return pending_info 

481 

482 def _merge_pending_worktrees(self, pending: list[PendingWorktreeInfo]) -> None: 

483 """Attempt to merge pending worktrees from previous runs. 

484 

485 Args: 

486 pending: List of pending worktree information 

487 """ 

488 with_work = [p for p in pending if p.has_pending_work] 

489 if not with_work: 

490 return 

491 

492 self.logger.info(f"Attempting to merge {len(with_work)} pending worktree(s)...") 

493 

494 for info in with_work: 

495 try: 

496 # If there are uncommitted changes, commit them first 

497 if info.has_uncommitted_changes: 

498 self.logger.info(f" Committing uncommitted changes in {info.issue_id}...") 

499 self._git_lock.run( 

500 ["add", "-A"], 

501 cwd=info.worktree_path, 

502 timeout=30, 

503 ) 

504 self._git_lock.run( 

505 [ 

506 "commit", 

507 "-m", 

508 f"WIP: Auto-commit from interrupted session for {info.issue_id}", 

509 ], 

510 cwd=info.worktree_path, 

511 timeout=30, 

512 ) 

513 

514 # Attempt merge 

515 self.logger.info(f" Merging {info.issue_id} ({info.branch_name})...") 

516 result = self._git_lock.run( 

517 [ 

518 "merge", 

519 "--no-ff", 

520 info.branch_name, 

521 "-m", 

522 f"Merge pending work for {info.issue_id}", 

523 ], 

524 cwd=self.repo_path, 

525 timeout=60, 

526 ) 

527 

528 if result.returncode == 0: 

529 self.logger.success(f" Successfully merged {info.issue_id}") 

530 # Clean up the worktree after successful merge 

531 self._git_lock.run( 

532 ["worktree", "remove", "--force", str(info.worktree_path)], 

533 cwd=self.repo_path, 

534 timeout=30, 

535 ) 

536 self._git_lock.run( 

537 ["branch", "-D", info.branch_name], 

538 cwd=self.repo_path, 

539 timeout=10, 

540 ) 

541 else: 

542 self.logger.warning(f" Failed to merge {info.issue_id}: {result.stderr}") 

543 # Abort the merge if it failed 

544 self._git_lock.run( 

545 ["merge", "--abort"], 

546 cwd=self.repo_path, 

547 timeout=10, 

548 ) 

549 

550 except Exception as e: 

551 self.logger.warning(f" Error merging {info.issue_id}: {e}") 

552 

553 def _signal_handler(self, signum: int, frame: object) -> None: 

554 """Handle shutdown signals gracefully.""" 

555 self._shutdown_requested = True 

556 # Propagate to worker pool for interrupted worker detection (ENH-036) 

557 self.worker_pool.set_shutdown_requested(True) 

558 self.logger.warning(f"Received signal {signum}, shutting down gracefully...") 

559 

560 def _load_state(self) -> None: 

561 """Load state from file for resume capability.""" 

562 if self.parallel_config.clean_start: 

563 self.state.started_at = datetime.now().isoformat() 

564 return 

565 state_file = self.repo_path / self.parallel_config.state_file 

566 if not state_file.exists(): 

567 self.state.started_at = datetime.now().isoformat() 

568 return 

569 

570 try: 

571 data = json.loads(state_file.read_text()) 

572 self.state = OrchestratorState.from_dict(data) 

573 

574 # Restore queue state 

575 self.queue.load_completed(self.state.completed_issues) 

576 self.queue.load_failed(self.state.failed_issues.keys()) 

577 

578 self.logger.info( 

579 f"Resumed from previous state: " 

580 f"{len(self.state.completed_issues)} completed, " 

581 f"{len(self.state.failed_issues)} failed" 

582 ) 

583 except Exception as e: 

584 self.logger.warning(f"Could not load state: {e}") 

585 self.state.started_at = datetime.now().isoformat() 

586 

587 def _save_state(self, force: bool = False) -> None: 

588 """Save current state to file using an atomic write. 

589 

590 Writes are throttled to at most once every 5 seconds to reduce filesystem I/O 

591 during high-frequency loop ticks (e.g., merge-waiting phase). Pass force=True 

592 to bypass the throttle, e.g., on shutdown. 

593 """ 

594 now = time.time() 

595 if not force and now - self._last_save_time < 5.0: 

596 return 

597 self._last_save_time = now 

598 with self._state_lock: 

599 self.state.last_checkpoint = datetime.now().isoformat() 

600 self.state.completed_issues = self.queue.completed_ids 

601 self.state.failed_issues = { 

602 issue_id: self._worker_errors.get(issue_id, "Failed") 

603 for issue_id in self.queue.failed_ids 

604 } 

605 self.state.in_progress_issues = self.queue.in_progress_ids 

606 

607 state_file = self.repo_path / self.parallel_config.state_file 

608 data = json.dumps(self.state.to_dict(), indent=2) 

609 tmp_fd, tmp_path = tempfile.mkstemp(dir=state_file.parent, suffix=".tmp") 

610 try: 

611 with os.fdopen(tmp_fd, "w") as f: 

612 f.write(data) 

613 os.replace(tmp_path, state_file) 

614 except Exception: 

615 os.unlink(tmp_path) 

616 raise 

617 

618 def _cleanup_state(self) -> None: 

619 """Remove state file on successful completion.""" 

620 state_file = self.repo_path / self.parallel_config.state_file 

621 if state_file.exists(): 

622 state_file.unlink() 

623 

624 def _dry_run(self) -> int: 

625 """Preview what would be processed without executing. 

626 

627 Returns: 

628 Exit code (always 0 for dry run) 

629 """ 

630 issues = self._scan_issues() 

631 

632 self.logger.info("=" * 60) 

633 self.logger.info("DRY RUN - No changes will be made") 

634 self.logger.info("=" * 60) 

635 self.logger.info("") 

636 

637 if not issues: 

638 self.logger.info("No issues found matching criteria") 

639 return 0 

640 

641 self.logger.info(f"Found {len(issues)} issues to process:") 

642 self.logger.info("") 

643 

644 # Group by priority 

645 by_priority: dict[str, list[IssueInfo]] = {} 

646 for issue in issues: 

647 by_priority.setdefault(issue.priority, []).append(issue) 

648 

649 for priority in IssuePriorityQueue.DEFAULT_PRIORITIES: 

650 if priority not in by_priority: 

651 continue 

652 

653 priority_issues = by_priority[priority] 

654 self.logger.info(f" {priority} ({len(priority_issues)} issues):") 

655 for issue in priority_issues: 

656 mode = ( 

657 "sequential" 

658 if priority == "P0" and self.parallel_config.p0_sequential 

659 else "parallel" 

660 ) 

661 self.logger.info(f" - {issue.issue_id}: {issue.title} [{mode}]") 

662 

663 self.logger.info("") 

664 self.logger.info("Configuration:") 

665 self.logger.info(f" Workers: {self.parallel_config.max_workers}") 

666 self.logger.info(f" P0 Sequential: {self.parallel_config.p0_sequential}") 

667 self.logger.info(f" Max Issues: {self.parallel_config.max_issues or 'unlimited'}") 

668 self.logger.info(f" Command Prefix: {self.parallel_config.command_prefix}") 

669 

670 return 0 

671 

672 def _maybe_report_status(self) -> None: 

673 """Report status if enough time has elapsed since last report. 

674 

675 Reports every 5 seconds during active processing for progress visibility (ENH-262). 

676 Suppresses duplicate lines when nothing has changed. 

677 """ 

678 now = time.time() 

679 # Report every 5 seconds 

680 if now - self._last_status_time < 5.0: 

681 return 

682 

683 self._last_status_time = now 

684 

685 # Build status line 

686 parts = [] 

687 

688 # Add wave label if present 

689 if self.wave_label: 

690 parts.append(f"{self.wave_label}") 

691 

692 # Get queue counts 

693 in_progress = len(self.queue.in_progress_ids) 

694 completed = self.queue.completed_count 

695 failed = self.queue.failed_count 

696 pending_merge = self.merge_coordinator.pending_count 

697 

698 parts.append(f"Active: {in_progress}") 

699 parts.append(f"Done: {completed}") 

700 if failed > 0: 

701 parts.append(f"Failed: {failed}") 

702 if pending_merge > 0: 

703 parts.append(f"Merging: {pending_merge}") 

704 

705 # Build status line 

706 status = " | ".join(parts) 

707 

708 # Get active worker stages 

709 active_stages = self.worker_pool.get_active_stages() 

710 

711 # Add worker details if any are active 

712 if active_stages: 

713 # Group by stage 

714 by_stage: dict[str, list[str]] = {} 

715 for issue_id, worker_stage in active_stages.items(): 

716 stage_name = worker_stage.value.title() 

717 by_stage.setdefault(stage_name, []).append(issue_id) 

718 

719 stage_parts = [] 

720 for stage_name in ["Validating", "Implementing", "Verifying", "Merging"]: 

721 if stage_name in by_stage: 

722 issue_ids = ", ".join(by_stage[stage_name]) 

723 stage_parts.append(f"{stage_name}: [{issue_ids}]") 

724 

725 if stage_parts: 

726 status += " | " + " | ".join(stage_parts) 

727 

728 # Skip if nothing changed since last report 

729 if status == self._last_status_line: 

730 return 

731 self._last_status_line = status 

732 

733 # Log with gray color to distinguish from normal logs 

734 self.logger.debug(status) 

735 

736 def _execute(self) -> int: 

737 """Execute parallel issue processing. 

738 

739 Returns: 

740 Exit code (0 = success, 1 = failure) 

741 """ 

742 start_time = time.time() 

743 

744 # Scan and queue issues 

745 issues = self._scan_issues() 

746 if not issues: 

747 self.logger.info("No issues to process") 

748 return 0 

749 

750 # Store issue info for lifecycle completion after merge 

751 for issue in issues: 

752 self._issue_info_by_id[issue.issue_id] = issue 

753 

754 added = self.queue.add_many(issues) 

755 self.logger.info(f"Queued {added} issues for processing") 

756 

757 # Start components 

758 self.worker_pool.start() 

759 self.merge_coordinator.start() 

760 

761 # Process issues 

762 issues_processed = 0 

763 max_issues = self.parallel_config.max_issues or float("inf") 

764 

765 while not self._shutdown_requested: 

766 # Check if done 

767 if self.queue.empty() and self.worker_pool.active_count == 0: 

768 # Wait for pending merges 

769 if self.merge_coordinator.pending_count == 0: 

770 break 

771 

772 # Check max issues limit 

773 if issues_processed >= max_issues: 

774 self.logger.info(f"Reached max issues limit ({max_issues})") 

775 break 

776 

777 # Get next issue if workers available 

778 if self.worker_pool.active_count < self.parallel_config.max_workers: 

779 queued = self.queue.get(block=False) 

780 if queued: 

781 issue = queued.issue_info 

782 

783 # P0 sequential processing 

784 if issue.priority == "P0" and self.parallel_config.p0_sequential: 

785 self._process_sequential(issue) 

786 else: 

787 self._process_parallel(issue) 

788 

789 issues_processed += 1 

790 

791 # Save state periodically 

792 self._save_state() 

793 

794 # Report status periodically for progress visibility (ENH-262) 

795 self._maybe_report_status() 

796 

797 # Small sleep to prevent busy loop 

798 time.sleep(0.1) 

799 

800 # Wait for completion 

801 self._wait_for_completion() 

802 

803 # Report results 

804 self._report_results(start_time) 

805 

806 # Cleanup state on success 

807 if not self._shutdown_requested and self.queue.failed_count == 0: 

808 self._cleanup_state() 

809 

810 return 0 if self.queue.failed_count == 0 else 1 

811 

812 def _scan_issues(self) -> list[IssueInfo]: 

813 """Scan for issues matching criteria. 

814 

815 Returns: 

816 List of issues sorted by priority 

817 """ 

818 # Combine skip_ids from state and config 

819 skip_ids = set(self.state.completed_issues) | set(self.state.failed_issues.keys()) 

820 if self.parallel_config.skip_ids: 

821 skip_ids |= self.parallel_config.skip_ids 

822 

823 issues = IssuePriorityQueue.scan_issues( 

824 self.br_config, 

825 priority_filter=list(self.parallel_config.priority_filter), 

826 skip_ids=skip_ids, 

827 only_ids=self.parallel_config.only_ids, 

828 type_prefixes=self.parallel_config.type_prefixes, 

829 label_filter=self.parallel_config.label_filter, 

830 ) 

831 

832 # Apply max issues limit 

833 if self.parallel_config.max_issues > 0: 

834 issues = issues[: self.parallel_config.max_issues] 

835 

836 return issues 

837 

838 def _process_sequential(self, issue: IssueInfo) -> None: 

839 """Process an issue sequentially (blocking). 

840 

841 Args: 

842 issue: Issue to process 

843 """ 

844 self.logger.info(f"Processing {issue.issue_id} sequentially (P0)") 

845 

846 # Wait for any parallel work to finish 

847 while self.worker_pool.active_count > 0: 

848 time.sleep(0.5) 

849 

850 # Process in main repo (no worktree isolation needed) 

851 # Note: No callback here - _merge_sequential handles the result explicitly 

852 # to avoid double-processing (callback would also queue merge/close) 

853 future = self.worker_pool.submit(issue) 

854 

855 # Wait for completion 

856 try: 

857 result = future.result(timeout=self.parallel_config.timeout_per_issue) 

858 if result.success: 

859 # Merge immediately for P0 

860 self._merge_sequential(result) 

861 except Exception as e: 

862 self.logger.error(f"Sequential processing failed: {e}") 

863 self.queue.mark_failed(issue.issue_id) 

864 

865 def _process_parallel(self, issue: IssueInfo) -> None: 

866 """Process an issue in parallel (non-blocking). 

867 

868 Args: 

869 issue: Issue to process 

870 """ 

871 # Check for overlaps if enabled (ENH-143) 

872 if self.overlap_detector: 

873 overlap = self.overlap_detector.check_overlap(issue) 

874 if overlap: 

875 if self.parallel_config.serialize_overlapping: 

876 self.logger.warning( 

877 f"Deferring {issue.issue_id} - overlaps with {overlap.overlapping_issues}" 

878 ) 

879 # Track for re-check when active issues complete 

880 self._deferred_issues.append(issue) 

881 return 

882 else: 

883 self.logger.warning( 

884 f"Warning: {issue.issue_id} may conflict with {overlap.overlapping_issues}" 

885 ) 

886 

887 # Register as active before dispatch 

888 self.overlap_detector.register_issue(issue) 

889 

890 self.logger.info(f"Dispatching {issue.issue_id} to worker pool") 

891 self.worker_pool.submit(issue, self._on_worker_complete) 

892 

893 def _on_worker_complete(self, result: WorkerResult) -> None: 

894 """Callback when a worker completes. 

895 

896 Args: 

897 result: Result from the worker 

898 """ 

899 # Unregister from overlap tracking (ENH-143) 

900 if self.overlap_detector: 

901 self.overlap_detector.unregister_issue(result.issue_id) 

902 # Re-queue deferred issues that were waiting on this one 

903 self._requeue_deferred_issues() 

904 

905 # Handle interrupted workers (not counted as failed) - ENH-036 

906 if result.interrupted: 

907 self.logger.info(f"{result.issue_id} was interrupted during shutdown (can retry)") 

908 self._interrupted_issues.append(result.issue_id) 

909 # Don't mark as failed - they can be retried on next run 

910 return 

911 

912 # Handle issue closure (no merge needed) 

913 if result.should_close: 

914 # Lazy import to avoid circular dependency 

915 from little_loops.issue_lifecycle import close_issue 

916 

917 self.logger.info(f"{result.issue_id} should be closed: {result.close_status}") 

918 info = self._issue_info_by_id.get(result.issue_id) 

919 if info: 

920 if close_issue( 

921 info, 

922 self.br_config, 

923 self.logger, 

924 result.close_reason, 

925 result.close_status, 

926 interceptors=None, 

927 ): 

928 self.queue.mark_completed(result.issue_id) 

929 else: 

930 self._worker_errors[result.issue_id] = ( 

931 f"Close failed: {result.close_reason or 'close error'}" 

932 ) 

933 self.queue.mark_failed(result.issue_id) 

934 else: 

935 self.logger.warning(f"No issue info found for {result.issue_id}") 

936 self._worker_errors[result.issue_id] = "Close failed: no issue info" 

937 self.queue.mark_failed(result.issue_id) 

938 elif result.success: 

939 self.logger.success( 

940 f"{result.issue_id} completed in {format_duration(result.duration)}" 

941 ) 

942 if result.was_corrected: 

943 self.logger.info(f"{result.issue_id} was auto-corrected during validation") 

944 # Log and store corrections for pattern analysis (ENH-010) 

945 for correction in result.corrections: 

946 self.logger.info(f" Correction: {correction}") 

947 if result.corrections: 

948 with self._state_lock: 

949 self.state.corrections[result.issue_id] = result.corrections 

950 if self.parallel_config.use_feature_branches: 

951 # Feature branch mode: skip auto-merge, branch stays alive for a PR (ENH-665) 

952 self.logger.info(f"{result.issue_id}: feature branch ready — {result.branch_name}") 

953 self.queue.mark_completed(result.issue_id) 

954 self._complete_issue_lifecycle_if_needed(result.issue_id) 

955 with self._state_lock: 

956 self._pr_ready_branches[result.issue_id] = result.branch_name 

957 else: 

958 self.merge_coordinator.queue_merge(result) 

959 # Wait for merge to complete before returning from callback. 

960 # This prevents dispatch of next worker while merge is in progress, 

961 # avoiding race conditions between worktree creation and merge ops. 

962 # (BUG-140: Race condition between worktree creation and merge) 

963 self.merge_coordinator.wait_for_completion(timeout=120) 

964 if result.issue_id in self.merge_coordinator.merged_ids: 

965 self.queue.mark_completed(result.issue_id) 

966 self._complete_issue_lifecycle_if_needed(result.issue_id) 

967 else: 

968 self._worker_errors[result.issue_id] = ( 

969 f"Merge failed: {result.error or 'merge error'}" 

970 ) 

971 self.queue.mark_failed(result.issue_id) 

972 else: 

973 self.logger.error(f"{result.issue_id} failed: {result.error}") 

974 self._worker_errors[result.issue_id] = result.error or "Failed" 

975 self.queue.mark_failed(result.issue_id) 

976 

977 # Update timing 

978 with self._state_lock: 

979 self.state.timing[result.issue_id] = { 

980 "total": result.duration, 

981 } 

982 

983 # Clean up stage tracking after callback completes (ENH-262) 

984 # Delay briefly so status reporter can show completion 

985 self.worker_pool.remove_worker_stage(result.issue_id) 

986 

987 # Emit worker completion event for extensions (ENH-921) 

988 if self._event_bus: 

989 self._event_bus.emit( 

990 { 

991 "event": "parallel.worker_completed", 

992 "ts": datetime.now(UTC).isoformat(), 

993 "issue_id": result.issue_id, 

994 "worker_name": result.worktree_path.name, 

995 "status": "success" if result.success else "failure", 

996 "duration_seconds": result.duration, 

997 } 

998 ) 

999 

1000 def _requeue_deferred_issues(self) -> None: 

1001 """Re-queue deferred issues that no longer have overlaps (ENH-143).""" 

1002 if not self._deferred_issues: 

1003 return 

1004 

1005 # Check each deferred issue for remaining overlaps 

1006 still_deferred = [] 

1007 for issue in self._deferred_issues: 

1008 if self.overlap_detector: 

1009 overlap = self.overlap_detector.check_overlap(issue) 

1010 if overlap: 

1011 # Still has overlaps, keep deferred 

1012 still_deferred.append(issue) 

1013 else: 

1014 # No more overlaps, add back to queue 

1015 self.logger.info(f"Re-queuing {issue.issue_id} - no longer overlapping") 

1016 self.queue.add(issue) 

1017 

1018 self._deferred_issues = still_deferred 

1019 

1020 def _merge_sequential(self, result: WorkerResult) -> None: 

1021 """Merge a sequential (P0) result immediately. 

1022 

1023 Args: 

1024 result: Result to merge 

1025 """ 

1026 # Handle closure for sequential issues 

1027 if result.should_close: 

1028 # Lazy import to avoid circular dependency 

1029 from little_loops.issue_lifecycle import close_issue 

1030 

1031 info = self._issue_info_by_id.get(result.issue_id) 

1032 if info and close_issue( 

1033 info, 

1034 self.br_config, 

1035 self.logger, 

1036 result.close_reason, 

1037 result.close_status, 

1038 interceptors=None, 

1039 ): 

1040 self.queue.mark_completed(result.issue_id) 

1041 else: 

1042 self._worker_errors[result.issue_id] = ( 

1043 f"Close failed: {result.close_reason or 'close error'}" 

1044 ) 

1045 self.queue.mark_failed(result.issue_id) 

1046 return 

1047 

1048 self.merge_coordinator.queue_merge(result) 

1049 # Wait for this specific merge 

1050 self.merge_coordinator.wait_for_completion(timeout=60) 

1051 

1052 if result.issue_id in self.merge_coordinator.merged_ids: 

1053 self.queue.mark_completed(result.issue_id) 

1054 self._complete_issue_lifecycle_if_needed(result.issue_id) 

1055 else: 

1056 self._worker_errors[result.issue_id] = f"Merge failed: {result.error or 'merge error'}" 

1057 self.queue.mark_failed(result.issue_id) 

1058 

1059 def _wait_for_completion(self) -> None: 

1060 """Wait for all workers and merges to complete.""" 

1061 self.logger.info("Waiting for workers to complete...") 

1062 

1063 # Calculate timeout 

1064 if self.parallel_config.orchestrator_timeout > 0: 

1065 timeout = self.parallel_config.orchestrator_timeout 

1066 else: 

1067 timeout = self.parallel_config.timeout_per_issue * self.parallel_config.max_workers 

1068 

1069 start = time.time() 

1070 while self.worker_pool.active_count > 0: 

1071 if time.time() - start > timeout: 

1072 self.logger.warning(f"Timeout waiting for workers after {timeout}s") 

1073 self.worker_pool.terminate_all_processes() 

1074 break 

1075 time.sleep(1.0) 

1076 

1077 # Wait for merges 

1078 self.logger.info("Waiting for pending merges...") 

1079 self.merge_coordinator.wait_for_completion(timeout=120) 

1080 

1081 # Update queue with merge results and complete lifecycle 

1082 for issue_id in self.merge_coordinator.merged_ids: 

1083 self.queue.mark_completed(issue_id) 

1084 self._complete_issue_lifecycle_if_needed(issue_id) 

1085 

1086 for issue_id, reason in self.merge_coordinator.failed_merges.items(): 

1087 self._worker_errors[issue_id] = reason or "Merge failed" 

1088 self.queue.mark_failed(issue_id) 

1089 

1090 def _report_results(self, start_time: float) -> None: 

1091 """Report processing results. 

1092 

1093 Args: 

1094 start_time: When processing started 

1095 """ 

1096 total_time = time.time() - start_time 

1097 self._execution_duration = total_time 

1098 

1099 self.logger.info("") 

1100 self.logger.info("=" * 60) 

1101 if self.wave_label: 

1102 self.logger.info(f"{self.wave_label.upper()} PROCESSING COMPLETE") 

1103 else: 

1104 self.logger.info("PARALLEL ISSUE PROCESSING COMPLETE") 

1105 self.logger.info("=" * 60) 

1106 self.logger.info("") 

1107 self.logger.timing(f"Total time: {format_duration(total_time)}") 

1108 self.logger.info(f"Completed: {self.queue.completed_count}") 

1109 self.logger.info(f"Failed: {self.queue.failed_count}") 

1110 if self._interrupted_issues: 

1111 self.logger.info(f"Interrupted: {len(self._interrupted_issues)}") 

1112 

1113 with self._state_lock: 

1114 timing_snapshot = dict(self.state.timing) 

1115 corrections_snapshot = dict(self.state.corrections) 

1116 

1117 if self.queue.completed_count > 0: 

1118 total_issue_time = sum(t.get("total", 0) for t in timing_snapshot.values()) 

1119 if total_issue_time > 0: 

1120 speedup = total_issue_time / total_time 

1121 self.logger.info(f"Estimated speedup: {speedup:.2f}x") 

1122 

1123 if self.queue.failed_ids: 

1124 self.logger.info("") 

1125 self.logger.warning("Failed issues:") 

1126 for issue_id in self.queue.failed_ids: 

1127 self.logger.warning(f" - {issue_id}") 

1128 

1129 # Report interrupted issues separately (ENH-036) 

1130 if self._interrupted_issues: 

1131 self.logger.info("") 

1132 self.logger.info(f"Interrupted: {len(self._interrupted_issues)} (can retry)") 

1133 for issue_id in self._interrupted_issues: 

1134 self.logger.info(f" - {issue_id}") 

1135 

1136 # Report PR-ready branches when use_feature_branches=True (ENH-665) 

1137 if self._pr_ready_branches: 

1138 self.logger.info("") 

1139 self.logger.info(f"PR-ready: {len(self._pr_ready_branches)} branch(es)") 

1140 for issue_id, branch in self._pr_ready_branches.items(): 

1141 self.logger.info(f" - {issue_id}: {branch}") 

1142 

1143 # Report correction statistics for quality tracking (ENH-010) 

1144 if corrections_snapshot: 

1145 total_corrected = len(corrections_snapshot) 

1146 total_issues = self.queue.completed_count + self.queue.failed_count 

1147 correction_rate = (total_corrected / total_issues * 100) if total_issues > 0 else 0 

1148 self.logger.info("") 

1149 self.logger.info( 

1150 f"Auto-corrections: {total_corrected}/{total_issues} ({correction_rate:.1f}%)" 

1151 ) 

1152 

1153 # Group corrections by category (ENH-010 fourth fix) 

1154 from collections import Counter, defaultdict 

1155 

1156 all_corrections: list[str] = [] 

1157 by_category: dict[str, int] = defaultdict(int) 

1158 for corrections in corrections_snapshot.values(): 

1159 all_corrections.extend(corrections) 

1160 for correction in corrections: 

1161 # Extract category from [category] prefix if present 

1162 if correction.startswith("[") and "]" in correction: 

1163 category = correction[1 : correction.index("]")] 

1164 by_category[category] += 1 

1165 else: 

1166 by_category["uncategorized"] += 1 

1167 

1168 # Log corrections by type/category 

1169 if by_category: 

1170 self.logger.info("Corrections by type:") 

1171 for category, count in sorted(by_category.items(), key=lambda x: -x[1]): 

1172 self.logger.info(f" - {category}: {count}") 

1173 

1174 # Log most common individual corrections 

1175 if all_corrections: 

1176 common = Counter(all_corrections).most_common(3) 

1177 self.logger.info("Most common corrections:") 

1178 for correction, count in common: 

1179 # Truncate long correction descriptions 

1180 display = correction[:60] + "..." if len(correction) > 60 else correction 

1181 self.logger.info(f" - {display}: {count}") 

1182 

1183 # Report stash pop warnings (local changes need manual recovery) 

1184 stash_warnings = self.merge_coordinator.stash_pop_failures 

1185 if stash_warnings: 

1186 self.logger.info("") 

1187 self.logger.warning("Stash recovery warnings (local changes need manual restoration):") 

1188 for issue_id, message in stash_warnings.items(): 

1189 self.logger.warning(f" - {issue_id}: {message}") 

1190 self.logger.warning("") 

1191 self.logger.warning( 

1192 "To recover: Run 'git stash list' to find your changes, " 

1193 "then 'git stash pop' or 'git stash apply stash@{N}'" 

1194 ) 

1195 

1196 def _complete_issue_lifecycle_if_needed(self, issue_id: str) -> bool: 

1197 """Complete issue lifecycle by writing ``status: done`` to frontmatter. 

1198 

1199 Args: 

1200 issue_id: ID of the issue to complete 

1201 

1202 Returns: 

1203 True if lifecycle was completed (or already complete), False on error 

1204 """ 

1205 info = self._issue_info_by_id.get(issue_id) 

1206 if not info: 

1207 self.logger.warning(f"No issue info found for {issue_id}") 

1208 return False 

1209 

1210 original_path = info.path 

1211 

1212 if not original_path.exists(): 

1213 return True 

1214 

1215 self.logger.info(f"Completing lifecycle for {issue_id} (frontmatter status update)") 

1216 

1217 try: 

1218 content = original_path.read_text() 

1219 

1220 # Add resolution section if not already present 

1221 if "## Resolution" not in content: 

1222 action = self.br_config.get_category_action(info.issue_type) 

1223 resolution = f""" 

1224 

1225--- 

1226 

1227## Resolution 

1228 

1229- **Action**: {action} 

1230- **Completed**: {datetime.now(UTC).strftime("%Y-%m-%d")} 

1231- **Status**: Completed (parallel merge fallback) 

1232- **Implementation**: Merged from parallel worker branch 

1233 

1234### Changes Made 

1235- See git history for implementation details 

1236 

1237### Verification Results 

1238- Work verification passed before merge 

1239 

1240### Commits 

1241- See `git log --oneline` for merge commit details 

1242""" 

1243 content += resolution 

1244 

1245 content = update_frontmatter( 

1246 content, 

1247 { 

1248 "status": "done", 

1249 "completed_at": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"), 

1250 }, 

1251 ) 

1252 original_path.write_text(content) 

1253 append_session_log_entry(original_path, "ll-parallel") 

1254 

1255 # Stage and commit 

1256 self._git_lock.run( 

1257 ["add", "-A"], 

1258 cwd=self.repo_path, 

1259 ) 

1260 

1261 action = self.br_config.get_category_action(info.issue_type) 

1262 commit_msg = f"""{action}({info.issue_type}): complete {issue_id} lifecycle 

1263 

1264Parallel merge fallback - status: done written to frontmatter. 

1265 

1266Issue: {issue_id} 

1267Type: {info.issue_type} 

1268Title: {info.title} 

1269""" 

1270 commit_result = self._git_lock.run( 

1271 ["commit", "-m", commit_msg], 

1272 cwd=self.repo_path, 

1273 ) 

1274 

1275 if commit_result.returncode != 0: 

1276 if "nothing to commit" not in commit_result.stdout.lower(): 

1277 self.logger.warning(f"git commit failed: {commit_result.stderr}") 

1278 else: 

1279 commit_hash_match = re.search(r"\[[\w-]+\s+([a-f0-9]+)\]", commit_result.stdout) 

1280 if commit_hash_match: 

1281 self.logger.success( 

1282 f"Completed lifecycle for {issue_id}: {commit_hash_match.group(1)}" 

1283 ) 

1284 else: 

1285 self.logger.success(f"Completed lifecycle for {issue_id}") 

1286 

1287 return True 

1288 

1289 except Exception as e: 

1290 self.logger.error(f"Failed to complete lifecycle for {issue_id}: {e}") 

1291 return False 

1292 

1293 def _cleanup(self) -> None: 

1294 """Clean up resources.""" 

1295 self.logger.info("Cleaning up...") 

1296 

1297 # Save final state (force=True bypasses throttle to ensure shutdown state is persisted) 

1298 self._save_state(force=True) 

1299 

1300 # Shutdown components 

1301 self.worker_pool.shutdown(wait=True) 

1302 self.merge_coordinator.shutdown(wait=True, timeout=30) 

1303 

1304 # Flush transports regardless of interrupt state so events are not lost. 

1305 if self._event_bus is not None: 

1306 self._event_bus.close_transports() 

1307 

1308 # Clean up worktrees if not interrupted 

1309 if not self._shutdown_requested: 

1310 self.worker_pool.cleanup_all_worktrees()