Coverage for little_loops / parallel / worker_pool.py: 89%

507 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Worker pool for parallel issue processing with git worktree isolation. 

2 

3Each worker operates in an isolated git worktree, allowing concurrent issue 

4processing without file conflicts. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import os 

11import re 

12import subprocess 

13import sys 

14import threading 

15import time 

16from collections.abc import Callable 

17from concurrent.futures import Future, ThreadPoolExecutor 

18from datetime import datetime 

19from pathlib import Path 

20from typing import TYPE_CHECKING, Any, cast 

21 

22from little_loops.host_runner import resolve_host 

23from little_loops.output_parsing import parse_ready_issue_output 

24from little_loops.parallel.git_lock import GitLock 

25from little_loops.parallel.types import ParallelConfig, WorkerResult, WorkerStage 

26from little_loops.subprocess_utils import ( 

27 assemble_guillotine_prompt, 

28 detect_context_handoff, 

29 read_continuation_prompt, 

30 read_sentinel, 

31 write_sentinel, 

32) 

33from little_loops.subprocess_utils import ( 

34 run_claude_command as _run_claude_base, 

35) 

36from little_loops.work_verification import EXCLUDED_DIRECTORIES, verify_work_was_done 

37 

38if TYPE_CHECKING: 

39 from little_loops.config import BRConfig 

40 from little_loops.issue_parser import IssueInfo 

41 from little_loops.logger import Logger 

42 

43 

44class WorkerPool: 

45 """Thread pool for processing issues in isolated git worktrees. 

46 

47 Each worker: 

48 1. Creates a dedicated git worktree and branch 

49 2. Runs issue validation and implementation via Claude CLI 

50 3. Commits changes locally 

51 4. Returns results for merge coordination 

52 

53 Example: 

54 >>> pool = WorkerPool(parallel_config, br_config, logger) 

55 >>> pool.start() 

56 >>> future = pool.submit(issue_info) 

57 >>> result = future.result() # WorkerResult 

58 >>> pool.shutdown() 

59 """ 

60 

61 def __init__( 

62 self, 

63 parallel_config: ParallelConfig, 

64 br_config: BRConfig, 

65 logger: Logger, 

66 repo_path: Path | None = None, 

67 git_lock: GitLock | None = None, 

68 ) -> None: 

69 """Initialize the worker pool. 

70 

71 Args: 

72 parallel_config: Parallel processing configuration 

73 br_config: Project configuration (for category actions) 

74 logger: Logger for worker output 

75 repo_path: Path to the git repository (default: current directory) 

76 git_lock: Shared lock for git operations (created if not provided) 

77 """ 

78 self.parallel_config = parallel_config 

79 self.br_config = br_config 

80 self.logger = logger 

81 self.repo_path = repo_path or Path.cwd() 

82 self._git_lock = git_lock or GitLock(logger) 

83 self._executor: ThreadPoolExecutor | None = None 

84 self._active_workers: dict[str, Future[WorkerResult]] = {} 

85 # Track active subprocesses for forceful termination on shutdown 

86 self._active_processes: dict[str, subprocess.Popen[str]] = {} 

87 # Track active worktree paths to prevent cleanup while in use (BUG-142) 

88 self._active_worktrees: set[Path] = set() 

89 self._process_lock = threading.Lock() 

90 # Track callbacks currently executing 

91 self._pending_callbacks: set[str] = set() 

92 self._callback_lock = threading.Lock() 

93 # Shutdown tracking for interrupted worker detection (ENH-036) 

94 self._shutdown_requested = False 

95 self._terminated_during_shutdown: set[str] = set() 

96 # Track worker processing stages for progress visibility (ENH-262) 

97 self._worker_stages: dict[str, WorkerStage] = {} 

98 

99 def start(self) -> None: 

100 """Start the worker pool.""" 

101 if self._executor is not None: 

102 return 

103 

104 # Ensure worktree base directory exists 

105 worktree_base = self.repo_path / self.parallel_config.worktree_base 

106 worktree_base.mkdir(parents=True, exist_ok=True) 

107 

108 self._executor = ThreadPoolExecutor( 

109 max_workers=self.parallel_config.max_workers, 

110 thread_name_prefix="issue-worker", 

111 ) 

112 self.logger.info(f"Worker pool started with {self.parallel_config.max_workers} workers") 

113 

114 def shutdown(self, wait: bool = True) -> None: 

115 """Shutdown the worker pool. 

116 

117 Args: 

118 wait: Whether to wait for pending tasks to complete 

119 """ 

120 if self._executor is None: 

121 return 

122 

123 self.logger.info("Shutting down worker pool...") 

124 

125 # First, terminate all active subprocesses to unblock worker threads 

126 if not wait: 

127 self.terminate_all_processes() 

128 

129 self._executor.shutdown(wait=wait) 

130 self._executor = None 

131 

132 def set_shutdown_requested(self, value: bool = True) -> None: 

133 """Set the shutdown flag. 

134 

135 Called by orchestrator during shutdown to enable tracking of 

136 workers that are terminated due to shutdown vs. actual failures. 

137 """ 

138 self._shutdown_requested = value 

139 

140 def terminate_all_processes(self) -> None: 

141 """Forcefully terminate all active subprocesses. 

142 

143 Called when we need to abort workers immediately, 

144 such as on timeout or shutdown. 

145 """ 

146 with self._process_lock: 

147 for issue_id, process in list(self._active_processes.items()): 

148 if process.poll() is None: # Still running 

149 self.logger.warning( 

150 f"Terminating subprocess for {issue_id} (PID {process.pid})" 

151 ) 

152 # Track issues terminated during shutdown for interrupted detection (ENH-036) 

153 if self._shutdown_requested: 

154 self._terminated_during_shutdown.add(issue_id) 

155 try: 

156 # Send SIGTERM first for graceful termination 

157 process.terminate() 

158 try: 

159 process.wait(timeout=5) 

160 except subprocess.TimeoutExpired: 

161 # Force kill if SIGTERM didn't work 

162 self.logger.warning(f"Force killing {issue_id} (PID {process.pid})") 

163 process.kill() 

164 process.wait(timeout=2) 

165 except Exception as e: 

166 self.logger.error(f"Failed to terminate {issue_id}: {e}") 

167 self._active_processes.clear() 

168 

169 def submit( 

170 self, 

171 issue: IssueInfo, 

172 on_complete: Callable[[WorkerResult], None] | None = None, 

173 ) -> Future[WorkerResult]: 

174 """Submit an issue for processing. 

175 

176 Args: 

177 issue: Issue to process 

178 on_complete: Optional callback when processing completes 

179 

180 Returns: 

181 Future that will contain the WorkerResult 

182 """ 

183 if self._executor is None: 

184 raise RuntimeError("Worker pool not started") 

185 

186 future = self._executor.submit(self._process_issue, issue) 

187 with self._process_lock: 

188 self._active_workers[issue.issue_id] = future 

189 

190 if on_complete: 

191 future.add_done_callback( 

192 lambda f: self._handle_completion(f, on_complete, issue.issue_id) 

193 ) 

194 

195 return future 

196 

197 def _handle_completion( 

198 self, 

199 future: Future[WorkerResult], 

200 callback: Callable[[WorkerResult], None], 

201 issue_id: str, 

202 ) -> None: 

203 """Handle worker completion and invoke callback.""" 

204 with self._callback_lock: 

205 self._pending_callbacks.add(issue_id) 

206 try: 

207 try: 

208 result = future.result() 

209 except Exception as e: 

210 self.logger.error(f"Worker future failed for {issue_id}: {e}") 

211 result = WorkerResult( 

212 issue_id=issue_id, 

213 success=False, 

214 branch_name="", 

215 worktree_path=Path(), 

216 error=f"Worker future failed: {e}", 

217 ) 

218 # Set final stage based on result (ENH-262) 

219 if result.success: 

220 self.set_worker_stage(issue_id, WorkerStage.COMPLETED) 

221 elif result.interrupted: 

222 self.set_worker_stage(issue_id, WorkerStage.INTERRUPTED) 

223 else: 

224 self.set_worker_stage(issue_id, WorkerStage.FAILED) 

225 try: 

226 callback(result) 

227 except Exception as e: 

228 self.logger.error(f"Worker completion callback failed for {issue_id}: {e}") 

229 finally: 

230 with self._callback_lock: 

231 self._pending_callbacks.discard(issue_id) 

232 

233 def _process_issue(self, issue: IssueInfo) -> WorkerResult: 

234 """Process a single issue in an isolated worktree. 

235 

236 Args: 

237 issue: Issue to process 

238 

239 Returns: 

240 WorkerResult with processing outcome 

241 """ 

242 start_time = time.time() 

243 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 

244 if self.parallel_config.use_feature_branches: 

245 from little_loops.issue_parser import slugify 

246 

247 branch_name = f"feature/{issue.issue_id.lower()}-{slugify(issue.title)}" 

248 else: 

249 branch_name = f"parallel/{issue.issue_id.lower()}-{timestamp}" 

250 worktree_path = ( 

251 self.repo_path 

252 / self.parallel_config.worktree_base 

253 / f"worker-{issue.issue_id.lower()}-{timestamp}" 

254 ) 

255 

256 # Set initial stage for progress tracking (ENH-262) 

257 self.set_worker_stage(issue.issue_id, WorkerStage.SETUP) 

258 

259 # Capture baseline of main repo status before worker starts 

260 # Used to detect files incorrectly written to main repo 

261 baseline_status = self._get_main_repo_baseline() 

262 # Capture main HEAD SHA before worker starts to detect committed leaks 

263 baseline_head_sha = self._get_main_head_sha() 

264 

265 try: 

266 # Step 1: Create worktree with new branch 

267 self._setup_worktree(worktree_path, branch_name) 

268 

269 # Register worktree as active to prevent cleanup while in use (BUG-142) 

270 with self._process_lock: 

271 self._active_worktrees.add(worktree_path) 

272 

273 # Update stage for progress tracking (ENH-262) 

274 self.set_worker_stage(issue.issue_id, WorkerStage.VALIDATING) 

275 

276 # Step 2: Run ready-issue validation 

277 ready_cmd = self.parallel_config.get_ready_command(issue.issue_id) 

278 ready_result = self._run_claude_command( 

279 ready_cmd, 

280 worktree_path, 

281 issue_id=issue.issue_id, 

282 ) 

283 

284 # Check if worker was terminated during shutdown (ENH-036) 

285 if issue.issue_id in self._terminated_during_shutdown: 

286 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED) 

287 return WorkerResult( 

288 issue_id=issue.issue_id, 

289 success=False, 

290 interrupted=True, 

291 branch_name=branch_name, 

292 worktree_path=worktree_path, 

293 duration=time.time() - start_time, 

294 error="Interrupted during shutdown", 

295 stdout=ready_result.stdout, 

296 stderr=ready_result.stderr, 

297 ) 

298 

299 if ready_result.returncode != 0: 

300 err_detail = ready_result.stderr or (ready_result.stdout or "")[:500] 

301 return WorkerResult( 

302 issue_id=issue.issue_id, 

303 success=False, 

304 branch_name=branch_name, 

305 worktree_path=worktree_path, 

306 duration=time.time() - start_time, 

307 error=f"ready-issue failed: {err_detail}", 

308 stdout=ready_result.stdout, 

309 stderr=ready_result.stderr, 

310 ) 

311 

312 # Step 3: Parse ready-issue output and check verdict 

313 ready_parsed = parse_ready_issue_output(ready_result.stdout) 

314 

315 # Handle CLOSE verdict - issue should not be implemented 

316 if ready_parsed.get("should_close"): 

317 return WorkerResult( 

318 issue_id=issue.issue_id, 

319 success=True, # Closure is a valid outcome 

320 branch_name=branch_name, 

321 worktree_path=worktree_path, 

322 duration=time.time() - start_time, 

323 should_close=True, 

324 close_reason=ready_parsed.get("close_reason"), 

325 close_status=ready_parsed.get("close_status"), 

326 stdout=ready_result.stdout, 

327 stderr=ready_result.stderr, 

328 ) 

329 

330 # Handle BLOCKED verdict - issue has open dependencies 

331 if ready_parsed.get("is_blocked"): 

332 return WorkerResult( 

333 issue_id=issue.issue_id, 

334 success=False, 

335 was_blocked=True, 

336 branch_name=branch_name, 

337 worktree_path=worktree_path, 

338 duration=time.time() - start_time, 

339 error="ready-issue verdict: BLOCKED - open dependency detected", 

340 stdout=ready_result.stdout, 

341 stderr=ready_result.stderr, 

342 ) 

343 

344 # Handle NOT_READY verdict 

345 if not ready_parsed["is_ready"]: 

346 concerns = ready_parsed.get("concerns", []) 

347 if concerns: 

348 concern_msg = "; ".join(concerns) 

349 elif ready_parsed["verdict"] == "UNKNOWN": 

350 # For UNKNOWN verdicts, show a snippet of output for debugging 

351 raw_out = (ready_result.stdout or "")[:200].strip() 

352 concern_msg = ( 

353 f"Could not parse verdict. Output: {raw_out}..." 

354 if raw_out 

355 else "No output from ready-issue" 

356 ) 

357 else: 

358 concern_msg = "Issue not ready" 

359 return WorkerResult( 

360 issue_id=issue.issue_id, 

361 success=False, 

362 branch_name=branch_name, 

363 worktree_path=worktree_path, 

364 duration=time.time() - start_time, 

365 error=f"ready-issue verdict: {ready_parsed['verdict']} - {concern_msg}", 

366 stdout=ready_result.stdout, 

367 stderr=ready_result.stderr, 

368 ) 

369 

370 # Track if issue was corrected (corrections stay in worktree) 

371 was_corrected = ready_parsed.get("was_corrected", False) 

372 corrections = ready_parsed.get("corrections", []) 

373 

374 # Update stage for progress tracking (ENH-262) 

375 self.set_worker_stage(issue.issue_id, WorkerStage.IMPLEMENTING) 

376 

377 # Decision gate: invoke decide-issue when the issue requires a decision 

378 if issue.decision_needed is True: 

379 decide_cmd = self.parallel_config.get_decide_command(issue.issue_id) 

380 decide_result = self._run_claude_command( 

381 decide_cmd, worktree_path, issue_id=issue.issue_id 

382 ) 

383 if decide_result.returncode != 0: 

384 self.logger.warning( 

385 f"[{issue.issue_id}] decide-issue command failed, " 

386 "continuing to implementation anyway..." 

387 ) 

388 

389 # Step 4: Get action from BRConfig 

390 action = self.br_config.get_category_action(issue.issue_type) 

391 

392 # Step 5: Run manage-issue implementation (with continuation support) 

393 manage_cmd = self.parallel_config.get_manage_command( 

394 issue.issue_type, action, issue.issue_id 

395 ) 

396 manage_result = self._run_with_continuation( 

397 manage_cmd, 

398 worktree_path, 

399 issue_id=issue.issue_id, 

400 ) 

401 

402 # Update stage for progress tracking (ENH-262) 

403 self.set_worker_stage(issue.issue_id, WorkerStage.VERIFYING) 

404 

405 # Check if worker was terminated during shutdown (ENH-036) 

406 if issue.issue_id in self._terminated_during_shutdown: 

407 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED) 

408 return WorkerResult( 

409 issue_id=issue.issue_id, 

410 success=False, 

411 interrupted=True, 

412 branch_name=branch_name, 

413 worktree_path=worktree_path, 

414 duration=time.time() - start_time, 

415 error="Interrupted during shutdown", 

416 stdout=manage_result.stdout, 

417 stderr=manage_result.stderr, 

418 ) 

419 

420 # Step 6: Get list of changed files in worktree 

421 changed_files = self._get_changed_files(worktree_path) 

422 

423 # Step 8: Detect files leaked to main repo instead of worktree (unstaged) 

424 leaked_files = self._detect_main_repo_leaks(issue.issue_id, baseline_status) 

425 if leaked_files: 

426 self.logger.warning( 

427 f"{issue.issue_id} leaked {len(leaked_files)} file(s) to main repo: " 

428 f"{leaked_files}" 

429 ) 

430 # Clean up leaked files to prevent stash conflicts during merge. 

431 # The actual work is preserved in the worktree branch. 

432 self._cleanup_leaked_files(leaked_files) 

433 

434 # Step 8b: Detect commits made directly to main instead of worktree branch. 

435 # If Claude committed to main (not the worktree), worktree will have no diff, 

436 # causing work verification to fail. Attempt to recover by cherry-picking 

437 # the leaked commits to the worktree and resetting main. (BUG-580) 

438 committed_leaks = self._detect_committed_leaks(baseline_head_sha) 

439 if committed_leaks: 

440 self.logger.warning( 

441 f"{issue.issue_id} committed {len(committed_leaks)} commit(s) directly " 

442 f"to main instead of worktree: {[sha[:8] for sha in committed_leaks]}" 

443 ) 

444 if not changed_files: 

445 recovered = self._recover_committed_leaks( 

446 committed_leaks, worktree_path, baseline_head_sha, issue.issue_id 

447 ) 

448 if recovered: 

449 changed_files = self._get_changed_files(worktree_path) 

450 

451 # Step 7: Verify actual work was done (after potential committed-leak recovery) 

452 # Pass full filename for better doc-only keyword matching 

453 issue_filename = issue.path.stem if issue.path else "" 

454 work_verified, verification_error = self._verify_work_was_done( 

455 changed_files, issue.issue_id, issue_filename 

456 ) 

457 

458 if manage_result.returncode != 0: 

459 err_detail = manage_result.stderr or (manage_result.stdout or "")[:500] 

460 return WorkerResult( 

461 issue_id=issue.issue_id, 

462 success=False, 

463 branch_name=branch_name, 

464 worktree_path=worktree_path, 

465 changed_files=changed_files, 

466 leaked_files=leaked_files, 

467 duration=time.time() - start_time, 

468 error=f"manage-issue failed: {err_detail}", 

469 stdout=manage_result.stdout, 

470 stderr=manage_result.stderr, 

471 ) 

472 

473 if not work_verified: 

474 return WorkerResult( 

475 issue_id=issue.issue_id, 

476 success=False, 

477 branch_name=branch_name, 

478 worktree_path=worktree_path, 

479 changed_files=changed_files, 

480 leaked_files=leaked_files, 

481 duration=time.time() - start_time, 

482 error=verification_error, 

483 stdout=manage_result.stdout, 

484 stderr=manage_result.stderr, 

485 ) 

486 

487 # Step 9: Update branch base before merge (BUG-180) 

488 # Fetch origin/main and rebase to ensure branch is based on latest main 

489 base_updated, base_error = self._update_branch_base(worktree_path, issue.issue_id) 

490 

491 # Update stage for progress tracking (ENH-262) 

492 self.set_worker_stage(issue.issue_id, WorkerStage.MERGING) 

493 

494 if not base_updated: 

495 return WorkerResult( 

496 issue_id=issue.issue_id, 

497 success=False, 

498 branch_name=branch_name, 

499 worktree_path=worktree_path, 

500 changed_files=changed_files, 

501 leaked_files=leaked_files, 

502 duration=time.time() - start_time, 

503 error=base_error, 

504 stdout=manage_result.stdout, 

505 stderr=manage_result.stderr, 

506 ) 

507 

508 return WorkerResult( 

509 issue_id=issue.issue_id, 

510 success=True, 

511 branch_name=branch_name, 

512 worktree_path=worktree_path, 

513 changed_files=changed_files, 

514 leaked_files=leaked_files, 

515 duration=time.time() - start_time, 

516 error=None, 

517 stdout=manage_result.stdout, 

518 stderr=manage_result.stderr, 

519 was_corrected=was_corrected, 

520 corrections=corrections, 

521 ) 

522 

523 except Exception as e: 

524 return WorkerResult( 

525 issue_id=issue.issue_id, 

526 success=False, 

527 branch_name=branch_name, 

528 worktree_path=worktree_path, 

529 duration=time.time() - start_time, 

530 error=str(e), 

531 ) 

532 finally: 

533 # Unregister worktree as no longer active (BUG-142) 

534 with self._process_lock: 

535 self._active_worktrees.discard(worktree_path) 

536 

537 def _setup_worktree(self, worktree_path: Path, branch_name: str) -> None: 

538 """Create a git worktree with a new branch. 

539 

540 Args: 

541 worktree_path: Path for the new worktree 

542 branch_name: Name of the new branch 

543 """ 

544 from little_loops.worktree_utils import setup_worktree 

545 

546 setup_worktree( 

547 repo_path=self.repo_path, 

548 worktree_path=worktree_path, 

549 branch_name=branch_name, 

550 copy_files=self.parallel_config.worktree_copy_files, 

551 logger=self.logger, 

552 git_lock=self._git_lock, 

553 ) 

554 

555 # Verify model if --show-model flag is set (requires API call) 

556 if self.parallel_config.show_model: 

557 model = self._detect_worktree_model_via_api(worktree_path) 

558 if model: 

559 self.logger.info(f" Using model: {model}") 

560 else: 

561 self.logger.warning(" Could not detect Claude CLI model") 

562 

563 def _detect_worktree_model_via_api(self, worktree_path: Path) -> str | None: 

564 """Detect the model Claude will use by making an API call. 

565 

566 Runs a minimal Claude command with JSON output and parses the modelUsage 

567 field to verify settings.local.json is being respected. 

568 

569 Args: 

570 worktree_path: Path to the worktree to test 

571 

572 Returns: 

573 Model name (e.g., "claude-sonnet-4-20250514") or None if unable to detect 

574 """ 

575 try: 

576 invocation = resolve_host().build_blocking_json(prompt="reply with just 'ok'") 

577 # No-perm-skip preserved: this is a detection probe, not a real run. 

578 args = [a for a in invocation.args if a != "--dangerously-skip-permissions"] 

579 

580 # Set environment to keep Claude in the project working directory (BUG-007) 

581 # This ensures the first Claude CLI invocation in the worktree has the same 

582 # project root behavior as subsequent invocations via run_claude_command() 

583 env = os.environ.copy() 

584 env["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1" 

585 env.update(invocation.env) 

586 

587 result = subprocess.run( 

588 [invocation.binary, *args], 

589 cwd=worktree_path, 

590 capture_output=True, 

591 text=True, 

592 timeout=30, 

593 env=env, 

594 ) 

595 if result.returncode == 0 and result.stdout.strip(): 

596 data: dict[str, Any] = json.loads(result.stdout.strip()) 

597 model_usage: dict[str, Any] = data.get("modelUsage", {}) 

598 # Return the first (primary) model from modelUsage 

599 if model_usage: 

600 return cast(str, next(iter(model_usage.keys()))) 

601 except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): 

602 pass 

603 return None 

604 

605 def _cleanup_worktree(self, worktree_path: Path) -> None: 

606 """Remove a git worktree and its associated branch. 

607 

608 Args: 

609 worktree_path: Path to the worktree to remove 

610 """ 

611 if not worktree_path.exists(): 

612 return 

613 

614 # Skip cleanup if worktree is actively in use by a running worker (BUG-142) 

615 with self._process_lock: 

616 if worktree_path in self._active_worktrees: 

617 self.logger.warning( 

618 f"Skipping cleanup of {worktree_path.name}: worktree is in active use" 

619 ) 

620 return 

621 

622 # Only delete branches with the parallel/ prefix (legacy behavior for ll-parallel) 

623 branch_result = subprocess.run( 

624 ["git", "rev-parse", "--abbrev-ref", "HEAD"], 

625 cwd=worktree_path, 

626 capture_output=True, 

627 text=True, 

628 ) 

629 branch_name = branch_result.stdout.strip() if branch_result.returncode == 0 else None 

630 delete_branch = branch_name is not None and branch_name.startswith("parallel/") 

631 

632 from little_loops.worktree_utils import cleanup_worktree 

633 

634 cleanup_worktree( 

635 worktree_path=worktree_path, 

636 repo_path=self.repo_path, 

637 logger=self.logger, 

638 git_lock=self._git_lock, 

639 delete_branch=delete_branch, 

640 ) 

641 

642 def _run_claude_command( 

643 self, 

644 command: str, 

645 working_dir: Path, 

646 issue_id: str | None = None, 

647 on_usage: Callable[[int, int], None] | None = None, 

648 resume_session: bool = False, 

649 ) -> subprocess.CompletedProcess[str]: 

650 """Run a Claude CLI command with real-time output streaming. 

651 

652 Args: 

653 command: The command to run (e.g., "/ll:ready-issue BUG-123") 

654 working_dir: Directory to run the command in 

655 issue_id: Optional issue ID for subprocess tracking 

656 on_usage: Optional usage callback for token tracking 

657 resume_session: If True, passes --continue to the Claude CLI 

658 

659 Returns: 

660 CompletedProcess with stdout and stderr 

661 """ 

662 stream_output = self.parallel_config.stream_subprocess_output 

663 

664 def stream_callback(line: str, is_stderr: bool) -> None: 

665 if stream_output: 

666 if is_stderr: 

667 print(f" {line}", file=sys.stderr) 

668 else: 

669 self.logger.info(f" {line}") 

670 

671 def on_start(process: subprocess.Popen[str]) -> None: 

672 if issue_id: 

673 with self._process_lock: 

674 self._active_processes[issue_id] = process 

675 

676 def on_end(process: subprocess.Popen[str]) -> None: 

677 if issue_id: 

678 with self._process_lock: 

679 self._active_processes.pop(issue_id, None) 

680 

681 return _run_claude_base( 

682 command=command, 

683 timeout=self.parallel_config.timeout_per_issue, 

684 working_dir=working_dir, 

685 stream_callback=stream_callback if stream_output else None, 

686 on_process_start=on_start if issue_id else None, 

687 on_process_end=on_end if issue_id else None, 

688 idle_timeout=self.parallel_config.idle_timeout_per_issue, 

689 on_usage=on_usage, 

690 resume_session=resume_session, 

691 ) 

692 

693 def _run_with_continuation( 

694 self, 

695 command: str, 

696 working_dir: Path, 

697 issue_id: str | None = None, 

698 max_continuations: int = 3, 

699 context_limit: int = 200_000, 

700 sentinel_threshold: float = 0.60, 

701 guillotine_threshold: float = 0.90, 

702 ) -> subprocess.CompletedProcess[str]: 

703 """Run a Claude command with automatic continuation on context handoff. 

704 

705 Mirrors the E+G+J logic in issue_manager.run_with_continuation. 

706 

707 Args: 

708 command: The command to run 

709 working_dir: Directory (worktree) to run the command in 

710 issue_id: Optional issue ID for subprocess tracking 

711 max_continuations: Maximum number of continuation attempts 

712 context_limit: Context window size in tokens 

713 sentinel_threshold: Write sentinel when usage >= this fraction 

714 guillotine_threshold: Trigger J-path when usage >= this fraction 

715 

716 Returns: 

717 Combined CompletedProcess with all session outputs 

718 """ 

719 all_stdout: list[str] = [] 

720 all_stderr: list[str] = [] 

721 current_command = command 

722 continuation_count = 0 

723 result: subprocess.CompletedProcess[str] = subprocess.CompletedProcess( 

724 args=[], returncode=1, stdout="", stderr="" 

725 ) 

726 tag = f"[{issue_id}]" if issue_id else "[worker]" 

727 

728 # Track token usage per-round for sentinel/guillotine thresholds 

729 _last_input: list[int] = [0] 

730 _last_output: list[int] = [0] 

731 

732 def _usage_tracker(input_tokens: int, output_tokens: int) -> None: 

733 _last_input[0] = input_tokens 

734 _last_output[0] = output_tokens 

735 

736 while continuation_count <= max_continuations: 

737 result = self._run_claude_command( 

738 current_command, 

739 working_dir, 

740 issue_id=issue_id, 

741 on_usage=_usage_tracker, 

742 ) 

743 

744 all_stdout.append(result.stdout) 

745 all_stderr.append(result.stderr) 

746 

747 # Standard path: Claude emitted CONTEXT_HANDOFF 

748 if detect_context_handoff(result.stdout): 

749 self.logger.info(f"{tag} Detected CONTEXT_HANDOFF signal") 

750 

751 prompt_content = read_continuation_prompt(working_dir) 

752 if not prompt_content: 

753 self.logger.warning( 

754 f"{tag} Context handoff signaled but no continuation prompt found" 

755 ) 

756 all_stderr.append("Handoff detected but no continuation prompt found") 

757 result = subprocess.CompletedProcess( 

758 args=result.args, returncode=1, stdout=result.stdout, stderr=result.stderr 

759 ) 

760 break 

761 

762 if continuation_count >= max_continuations: 

763 self.logger.warning( 

764 f"{tag} Reached max continuations ({max_continuations}), stopping" 

765 ) 

766 break 

767 

768 continuation_count += 1 

769 self.logger.info(f"{tag} Starting continuation session #{continuation_count}") 

770 current_command = f"{command} --resume" 

771 continue 

772 

773 total_tokens = _last_input[0] + _last_output[0] 

774 usage_ratio = total_tokens / context_limit if context_limit > 0 else 0.0 

775 prompt_too_long = "prompt is too long" in (result.stderr or "").lower() 

776 

777 # Option J: guillotine — fresh session with transcript-summary prompt 

778 if ( 

779 prompt_too_long or usage_ratio >= guillotine_threshold 

780 ) and continuation_count < max_continuations: 

781 trigger_reason = ( 

782 "Prompt is too long" if prompt_too_long else f"usage {usage_ratio:.0%}" 

783 ) 

784 self.logger.warning( 

785 f"{tag} Option J triggered ({trigger_reason}): spawning fresh session" 

786 ) 

787 try: 

788 guillotine_cmd = assemble_guillotine_prompt( 

789 original_command=command, 

790 captured_stdout="\n---CONTINUATION---\n".join(all_stdout), 

791 token_stats={ 

792 "input_tokens": _last_input[0], 

793 "output_tokens": _last_output[0], 

794 "context_limit": context_limit, 

795 "trigger_reason": trigger_reason, 

796 }, 

797 ) 

798 except Exception as exc: 

799 self.logger.warning( 

800 f"{tag} Failed to assemble guillotine prompt ({exc}), using bare restart" 

801 ) 

802 guillotine_cmd = command 

803 continuation_count += 1 

804 current_command = guillotine_cmd 

805 _last_input[0] = 0 

806 _last_output[0] = 0 

807 continue 

808 

809 # Option E: read sentinel from a PREVIOUS session (must run before G writes 

810 # the current-session sentinel to avoid immediately consuming our own write). 

811 sentinel_data = read_sentinel(working_dir) 

812 if sentinel_data is not None and continuation_count < max_continuations: 

813 usage_pct = sentinel_data.get("usage_percent", int(usage_ratio * 100)) 

814 self.logger.info( 

815 f"{tag} Sentinel detected ({usage_pct}% context used): " 

816 "sending explicit handoff instruction" 

817 ) 

818 continuation_count += 1 

819 explicit_handoff_instruction = ( 

820 f"Context limit is approaching ({usage_pct}% of the context window is used). " 

821 "Please run /ll:handoff RIGHT NOW to save your progress to " 

822 ".ll/ll-continue-prompt.md, then output " 

823 '"CONTEXT_HANDOFF: Ready for fresh session" to signal continuation.' 

824 ) 

825 _last_input[0] = 0 

826 _last_output[0] = 0 

827 result = self._run_claude_command( 

828 explicit_handoff_instruction, 

829 working_dir, 

830 issue_id=issue_id, 

831 on_usage=_usage_tracker, 

832 resume_session=True, 

833 ) 

834 all_stdout.append(result.stdout) 

835 all_stderr.append(result.stderr) 

836 

837 if detect_context_handoff(result.stdout): 

838 self.logger.info( 

839 f"{tag} CONTEXT_HANDOFF detected after explicit handoff instruction" 

840 ) 

841 prompt_content = read_continuation_prompt(working_dir) 

842 if prompt_content and continuation_count < max_continuations: 

843 continuation_count += 1 

844 self.logger.info( 

845 f"{tag} Starting continuation session #{continuation_count}" 

846 ) 

847 current_command = f"{command} --resume" 

848 _last_input[0] = 0 

849 _last_output[0] = 0 

850 continue 

851 break 

852 

853 # Option G (Python layer): write sentinel for the NEXT session. 

854 # Placed after E-path so we don't immediately consume our own write. 

855 if total_tokens > 0 and usage_ratio >= sentinel_threshold: 

856 self.logger.info( 

857 f"{tag} Writing context-handoff sentinel ({usage_ratio:.0%} context used)" 

858 ) 

859 write_sentinel(working_dir, token_count=total_tokens, context_limit=context_limit) 

860 

861 # No handoff signal, no prior-session sentinel, no overflow — done 

862 break 

863 

864 return subprocess.CompletedProcess( 

865 args=result.args, 

866 returncode=result.returncode, 

867 stdout="\n---CONTINUATION---\n".join(all_stdout), 

868 stderr="\n---CONTINUATION---\n".join(all_stderr), 

869 ) 

870 

871 def _get_changed_files(self, worktree_path: Path) -> list[str]: 

872 """Get list of files changed in the worktree. 

873 

874 Args: 

875 worktree_path: Path to the worktree 

876 

877 Returns: 

878 List of changed file paths relative to repo root 

879 """ 

880 result = subprocess.run( 

881 ["git", "diff", "--name-only", self.parallel_config.base_branch, "HEAD"], 

882 cwd=worktree_path, 

883 capture_output=True, 

884 text=True, 

885 timeout=30, 

886 ) 

887 

888 if result.returncode != 0: 

889 return [] 

890 

891 return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] 

892 

893 def _update_branch_base(self, worktree_path: Path, issue_id: str) -> tuple[bool, str]: 

894 """Fetch origin/main and rebase worker branch onto it. 

895 

896 This ensures the worker branch is based on the latest main before 

897 merge coordination, preventing conflicts when main advances during 

898 sprint execution (BUG-180). 

899 

900 Args: 

901 worktree_path: Path to the worker's worktree 

902 issue_id: Issue ID for logging 

903 

904 Returns: 

905 Tuple of (success, error_message) 

906 """ 

907 # Fetch latest base branch from configured remote (fall back to local if fetch fails) 

908 base = self.parallel_config.base_branch 

909 remote = self.parallel_config.remote_name 

910 fetch_result = subprocess.run( 

911 ["git", "fetch", remote, base], 

912 cwd=worktree_path, 

913 capture_output=True, 

914 text=True, 

915 timeout=60, 

916 ) 

917 

918 rebase_target = f"{remote}/{base}" if fetch_result.returncode == 0 else base 

919 

920 # Rebase current branch onto base (remote or local fallback) 

921 rebase_result = subprocess.run( 

922 ["git", "rebase", rebase_target], 

923 cwd=worktree_path, 

924 capture_output=True, 

925 text=True, 

926 timeout=120, 

927 ) 

928 

929 if rebase_result.returncode != 0: 

930 # Abort the failed rebase 

931 subprocess.run( 

932 ["git", "rebase", "--abort"], 

933 cwd=worktree_path, 

934 capture_output=True, 

935 timeout=10, 

936 ) 

937 return False, f"Failed to rebase onto {rebase_target}: {rebase_result.stderr}" 

938 

939 self.logger.info(f"[{issue_id}] Rebased branch onto {rebase_target}") 

940 return True, "" 

941 

942 def _verify_work_was_done( 

943 self, changed_files: list[str], issue_id: str, issue_filename: str = "" 

944 ) -> tuple[bool, str]: 

945 """Verify that actual implementation work was done. 

946 

947 Uses the shared verify_work_was_done() function to check that changed 

948 files include meaningful work, not just issue files or other artifacts. 

949 

950 Args: 

951 changed_files: List of files changed during processing 

952 issue_id: The issue ID being processed (unused, kept for compatibility) 

953 issue_filename: Full issue filename (unused, kept for compatibility) 

954 

955 Returns: 

956 Tuple of (success, error_message) 

957 """ 

958 if not changed_files: 

959 return False, "No files were changed during implementation" 

960 

961 # Check if code changes are required 

962 if not self.parallel_config.require_code_changes: 

963 return True, "" 

964 

965 # Use shared verification function 

966 if verify_work_was_done(self.logger, changed_files): 

967 return True, "" 

968 

969 # Generate descriptive error with actual excluded files 

970 excluded_files = [ 

971 f 

972 for f in changed_files 

973 if f and any(f.startswith(excl) for excl in EXCLUDED_DIRECTORIES) 

974 ] 

975 if excluded_files: 

976 files_preview = ", ".join(excluded_files[:5]) 

977 if len(excluded_files) > 5: 

978 files_preview += f" (+{len(excluded_files) - 5} more)" 

979 return False, f"Only excluded files modified: {files_preview}" 

980 return False, "Only excluded files modified (e.g., .issues/, thoughts/)" 

981 

982 def _has_other_issue_id(self, file_lower: str, current_issue_id_lower: str) -> bool: 

983 """Check if file contains a different issue ID than the current worker's. 

984 

985 This prevents cross-worker contamination where worker A detects worker B's 

986 leaked file. When multiple workers run in parallel, their leaked files may 

987 both appear in the main repo. Each worker should only clean up its own leaks. 

988 

989 Args: 

990 file_lower: Lowercase file path to check 

991 current_issue_id_lower: Lowercase issue ID of the current worker 

992 

993 Returns: 

994 True if the file contains a different issue ID (belongs to another worker), 

995 False if the file contains the current issue ID or no recognizable issue ID 

996 """ 

997 # Pattern matches common issue ID formats: BUG-123, ENH-456, FEAT-789, EPIC-001 

998 # Use non-capturing group (?:...) so findall returns full match, not group 

999 matches = re.findall(r"(?:bug|enh|feat|epic)-\d+", file_lower) 

1000 

1001 if not matches: 

1002 # No issue ID found - file doesn't belong to any specific worker 

1003 return False 

1004 

1005 # Check if any of the found issue IDs match the current worker 

1006 for match in matches: 

1007 if match == current_issue_id_lower: 

1008 return False # File belongs to current worker 

1009 

1010 # File has issue ID(s) but none match current worker - belongs to another worker 

1011 return True 

1012 

1013 def _detect_main_repo_leaks(self, issue_id: str, baseline_status: set[str]) -> list[str]: 

1014 """Detect files incorrectly written to main repo instead of worktree. 

1015 

1016 Claude Code may write files to the main repository instead of the 

1017 worktree due to project root detection issues (see GitHub #8771). 

1018 This method detects such leaks by comparing main repo status before 

1019 and after worker execution. 

1020 

1021 Args: 

1022 issue_id: ID of the issue being processed (for pattern matching) 

1023 baseline_status: Set of file paths from git status before worker started 

1024 

1025 Returns: 

1026 List of file paths that were leaked to main repo 

1027 """ 

1028 # Get current status of main repo 

1029 result = self._git_lock.run( 

1030 ["status", "--porcelain"], 

1031 cwd=self.repo_path, 

1032 timeout=30, 

1033 ) 

1034 

1035 if result.returncode != 0: 

1036 return [] 

1037 

1038 current_files: set[str] = set() 

1039 for line in result.stdout.strip().split("\n"): 

1040 if not line or len(line) < 3: 

1041 continue 

1042 # Extract file path (after status codes and space) 

1043 file_path = line[3:].strip() 

1044 # Handle renamed files (old -> new) 

1045 if " -> " in file_path: 

1046 file_path = file_path.split(" -> ")[-1] 

1047 current_files.add(file_path) 

1048 

1049 # Find new files that appeared during worker execution 

1050 new_files = current_files - baseline_status 

1051 

1052 # Filter to files likely related to this issue 

1053 issue_id_lower = issue_id.lower() 

1054 leaked_files: list[str] = [] 

1055 

1056 # Build source prefix list: start with common fallbacks, then add configured dirs 

1057 source_prefixes = ["backend/", "src/", "lib/", "tests/"] 

1058 for dir_path in [self.br_config.project.src_dir, self.br_config.project.test_dir]: 

1059 if dir_path: 

1060 normalized = dir_path.rstrip("/") + "/" 

1061 if normalized not in source_prefixes: 

1062 source_prefixes.append(normalized) 

1063 

1064 for file_path in new_files: 

1065 # Skip state file (managed by orchestrator) 

1066 if file_path.endswith(".parallel-manage-state.json"): 

1067 continue 

1068 # Skip .gitignore (may be modified by ll-parallel) 

1069 if file_path == ".gitignore": 

1070 continue 

1071 

1072 # Check if file is related to this issue 

1073 file_lower = file_path.lower() 

1074 if issue_id_lower in file_lower: 

1075 leaked_files.append(file_path) 

1076 # Also catch source files that shouldn't be modified in main 

1077 elif file_path.startswith(tuple(source_prefixes)): 

1078 leaked_files.append(file_path) 

1079 # Catch thoughts/plans files 

1080 elif file_path.startswith("thoughts/"): 

1081 leaked_files.append(file_path) 

1082 # Catch issue files in any issue directory variant 

1083 # Handles both .issues/ (with dot) and issues/ (without dot) 

1084 # Only include files without a different issue ID - files WITH other issue IDs 

1085 # belong to other workers running in parallel (cross-worker contamination) 

1086 elif file_path.startswith((".issues/", "issues/")): 

1087 if not self._has_other_issue_id(file_lower, issue_id_lower): 

1088 leaked_files.append(file_path) 

1089 

1090 return leaked_files 

1091 

1092 def _cleanup_leaked_files(self, leaked_files: list[str]) -> int: 

1093 """Discard leaked files from main repo working directory. 

1094 

1095 Claude Code sometimes writes files to the main repo instead of the 

1096 worktree. These files cause stash conflicts during merge operations. 

1097 Since the actual work is preserved in the worktree branch, we can 

1098 safely discard these leaked changes from the main repo. 

1099 

1100 Args: 

1101 leaked_files: List of file paths leaked to main repo 

1102 

1103 Returns: 

1104 Number of files successfully cleaned up 

1105 """ 

1106 if not leaked_files: 

1107 return 0 

1108 

1109 cleaned = 0 

1110 

1111 # Get status to determine which files are tracked vs untracked 

1112 status_result = self._git_lock.run( 

1113 ["status", "--porcelain", "--"] + leaked_files, 

1114 cwd=self.repo_path, 

1115 timeout=30, 

1116 ) 

1117 

1118 tracked_files: list[str] = [] 

1119 untracked_files: list[str] = [] 

1120 

1121 for line in status_result.stdout.splitlines(): 

1122 if not line or len(line) < 3: 

1123 continue 

1124 status_code = line[:2] 

1125 file_path = line[3:].split(" -> ")[-1].strip() 

1126 

1127 if status_code.startswith("?"): 

1128 # Untracked file - need to delete 

1129 untracked_files.append(file_path) 

1130 else: 

1131 # Tracked file - can use git checkout to discard 

1132 tracked_files.append(file_path) 

1133 

1134 # Discard changes to tracked files 

1135 if tracked_files: 

1136 checkout_result = self._git_lock.run( 

1137 ["checkout", "--"] + tracked_files, 

1138 cwd=self.repo_path, 

1139 timeout=30, 

1140 ) 

1141 if checkout_result.returncode == 0: 

1142 cleaned += len(tracked_files) 

1143 else: 

1144 self.logger.warning( 

1145 f"Failed to discard tracked leaked files: {checkout_result.stderr}" 

1146 ) 

1147 

1148 # Delete untracked files 

1149 for file_path in untracked_files: 

1150 full_path = self.repo_path / file_path 

1151 try: 

1152 if full_path.exists(): 

1153 full_path.unlink() 

1154 cleaned += 1 

1155 except OSError as e: 

1156 self.logger.warning(f"Failed to delete leaked file {file_path}: {e}") 

1157 

1158 # Fallback: directly delete files not reported by git status 

1159 # This handles gitignored files that git status --porcelain doesn't show 

1160 accounted_files = set(tracked_files + untracked_files) 

1161 for file_path in leaked_files: 

1162 if file_path not in accounted_files: 

1163 full_path = self.repo_path / file_path 

1164 if full_path.exists(): 

1165 try: 

1166 full_path.unlink() 

1167 cleaned += 1 

1168 self.logger.info(f"Deleted gitignored leaked file: {file_path}") 

1169 except OSError as e: 

1170 self.logger.warning( 

1171 f"Failed to delete gitignored leaked file {file_path}: {e}" 

1172 ) 

1173 else: 

1174 self.logger.debug(f"Leaked file not found (may have been moved): {file_path}") 

1175 

1176 if cleaned > 0: 

1177 self.logger.info(f"Cleaned up {cleaned} leaked file(s) from main repo") 

1178 

1179 return cleaned 

1180 

1181 def _get_main_repo_baseline(self) -> set[str]: 

1182 """Get baseline of modified/untracked files in main repo. 

1183 

1184 Returns: 

1185 Set of file paths currently showing in git status 

1186 """ 

1187 result = self._git_lock.run( 

1188 ["status", "--porcelain"], 

1189 cwd=self.repo_path, 

1190 timeout=30, 

1191 ) 

1192 

1193 if result.returncode != 0: 

1194 return set() 

1195 

1196 files: set[str] = set() 

1197 for line in result.stdout.strip().split("\n"): 

1198 if not line or len(line) < 3: 

1199 continue 

1200 file_path = line[3:].strip() 

1201 if " -> " in file_path: 

1202 file_path = file_path.split(" -> ")[-1] 

1203 files.add(file_path) 

1204 

1205 return files 

1206 

1207 def _get_main_head_sha(self) -> str: 

1208 """Get the current HEAD SHA of the main repo. 

1209 

1210 Returns: 

1211 HEAD SHA string, or empty string if unavailable 

1212 """ 

1213 result = self._git_lock.run( 

1214 ["rev-parse", "HEAD"], 

1215 cwd=self.repo_path, 

1216 timeout=10, 

1217 ) 

1218 if result.returncode == 0: 

1219 return result.stdout.strip() 

1220 return "" 

1221 

1222 def _detect_committed_leaks(self, baseline_head_sha: str) -> list[str]: 

1223 """Detect commits made directly to main repo during worker execution. 

1224 

1225 When Claude commits to the main repo instead of the worktree branch, 

1226 the commits appear on main's history but the worktree has no changes. 

1227 This method detects such leaked commits by comparing main's HEAD SHA 

1228 before and after worker execution. 

1229 

1230 Args: 

1231 baseline_head_sha: HEAD SHA captured before worker started 

1232 

1233 Returns: 

1234 List of commit SHAs committed to main during worker execution, 

1235 newest first. Empty list if no committed leaks detected. 

1236 """ 

1237 if not baseline_head_sha: 

1238 return [] 

1239 

1240 current_sha = self._get_main_head_sha() 

1241 if not current_sha or current_sha == baseline_head_sha: 

1242 return [] 

1243 

1244 # Get list of new commits on main since baseline 

1245 result = self._git_lock.run( 

1246 ["log", "--format=%H", f"{baseline_head_sha}..HEAD"], 

1247 cwd=self.repo_path, 

1248 timeout=30, 

1249 ) 

1250 if result.returncode != 0: 

1251 return [] 

1252 

1253 commits = [sha.strip() for sha in result.stdout.strip().split("\n") if sha.strip()] 

1254 return commits 

1255 

1256 def _recover_committed_leaks( 

1257 self, 

1258 leaked_commits: list[str], 

1259 worktree_path: Path, 

1260 baseline_head_sha: str, 

1261 issue_id: str, 

1262 ) -> bool: 

1263 """Attempt to recover committed leaks by cherry-picking to worktree. 

1264 

1265 When Claude commits directly to main instead of the worktree branch, 

1266 we attempt to: 

1267 1. Cherry-pick the leaked commits onto the worktree branch 

1268 2. Reset main back to the baseline SHA (if safe to do so) 

1269 

1270 This preserves the implementation work in the worktree while 

1271 cleaning up the incorrect commits on main. 

1272 

1273 Args: 

1274 leaked_commits: Commit SHAs that leaked to main (newest first) 

1275 worktree_path: Path to the worker's worktree 

1276 baseline_head_sha: Main HEAD SHA before worker started 

1277 issue_id: Issue ID for logging 

1278 

1279 Returns: 

1280 True if cherry-pick succeeded (main reset is attempted but 

1281 not required for a True return value) 

1282 """ 

1283 self.logger.info( 

1284 f"[{issue_id}] Attempting recovery: cherry-picking {len(leaked_commits)} " 

1285 f"commit(s) to worktree" 

1286 ) 

1287 

1288 # Cherry-pick in chronological order (oldest first = reverse of log output) 

1289 for sha in reversed(leaked_commits): 

1290 result = subprocess.run( 

1291 ["git", "cherry-pick", sha], 

1292 cwd=worktree_path, 

1293 capture_output=True, 

1294 text=True, 

1295 timeout=60, 

1296 ) 

1297 if result.returncode != 0: 

1298 subprocess.run( 

1299 ["git", "cherry-pick", "--abort"], 

1300 cwd=worktree_path, 

1301 capture_output=True, 

1302 timeout=10, 

1303 ) 

1304 self.logger.warning( 

1305 f"[{issue_id}] Cherry-pick of {sha[:8]} failed: {result.stderr.strip()}" 

1306 ) 

1307 return False 

1308 

1309 # Attempt to reset main to baseline (only if main hasn't advanced further) 

1310 current_main_sha = self._get_main_head_sha() 

1311 most_recent_leaked = leaked_commits[0] # Newest first 

1312 if current_main_sha == most_recent_leaked: 

1313 reset_result = self._git_lock.run( 

1314 ["reset", "--hard", baseline_head_sha], 

1315 cwd=self.repo_path, 

1316 timeout=30, 

1317 ) 

1318 if reset_result.returncode == 0: 

1319 self.logger.info(f"[{issue_id}] Reset main to baseline {baseline_head_sha[:8]}") 

1320 else: 

1321 self.logger.warning( 

1322 f"[{issue_id}] Cherry-pick succeeded but failed to reset main: " 

1323 f"{reset_result.stderr.strip()}" 

1324 ) 

1325 else: 

1326 # main has advanced past the leaked commits — attempt surgical rebase 

1327 # to excise only the leaked commits while preserving subsequent work 

1328 self.logger.info( 

1329 f"[{issue_id}] Main has advanced beyond leaked commits " 

1330 f"({current_main_sha[:8]} != {most_recent_leaked[:8]}) — " 

1331 f"attempting surgical rebase to excise leaked commits" 

1332 ) 

1333 rebase_result = self._git_lock.run( 

1334 ["rebase", "--onto", baseline_head_sha, most_recent_leaked], 

1335 cwd=self.repo_path, 

1336 timeout=60, 

1337 ) 

1338 if rebase_result.returncode == 0: 

1339 self.logger.info(f"[{issue_id}] Surgically removed leaked commits via rebase") 

1340 else: 

1341 self._git_lock.run( 

1342 ["rebase", "--abort"], 

1343 cwd=self.repo_path, 

1344 timeout=10, 

1345 ) 

1346 self.logger.warning( 

1347 f"[{issue_id}] Surgical rebase failed — manual cleanup required: " 

1348 f"{rebase_result.stderr.strip()}" 

1349 ) 

1350 

1351 self.logger.info( 

1352 f"[{issue_id}] Recovered {len(leaked_commits)} commit(s): " 

1353 f"cherry-picked to worktree branch" 

1354 ) 

1355 return True 

1356 

1357 @property 

1358 def active_count(self) -> int: 

1359 """Number of currently active workers. 

1360 

1361 Includes both workers with running futures AND workers whose futures 

1362 are done but callbacks haven't completed yet. 

1363 """ 

1364 with self._process_lock: 

1365 running_futures = sum(1 for f in self._active_workers.values() if not f.done()) 

1366 with self._callback_lock: 

1367 pending_callback_count = len(self._pending_callbacks) 

1368 return running_futures + pending_callback_count 

1369 

1370 def set_worker_stage(self, issue_id: str, stage: WorkerStage) -> None: 

1371 """Update the stage of a worker. 

1372 

1373 Args: 

1374 issue_id: Issue ID being processed 

1375 stage: New stage value 

1376 """ 

1377 with self._process_lock: 

1378 self._worker_stages[issue_id] = stage 

1379 

1380 def get_worker_stage(self, issue_id: str) -> WorkerStage | None: 

1381 """Get the current stage of a worker. 

1382 

1383 Args: 

1384 issue_id: Issue ID being processed 

1385 

1386 Returns: 

1387 Current stage, or None if issue not being tracked 

1388 """ 

1389 with self._process_lock: 

1390 return self._worker_stages.get(issue_id) 

1391 

1392 def get_active_stages(self) -> dict[str, WorkerStage]: 

1393 """Get all active worker stages. 

1394 

1395 Returns: 

1396 Dictionary mapping issue_id to current stage for active workers 

1397 """ 

1398 with self._process_lock: 

1399 # Only return workers that are actually active 

1400 active_ids = set(self._active_workers.keys()) 

1401 return { 

1402 issue_id: stage 

1403 for issue_id, stage in self._worker_stages.items() 

1404 if issue_id in active_ids 

1405 } 

1406 

1407 def remove_worker_stage(self, issue_id: str) -> None: 

1408 """Remove a worker from stage tracking. 

1409 

1410 Args: 

1411 issue_id: Issue ID to remove 

1412 """ 

1413 with self._process_lock: 

1414 self._worker_stages.pop(issue_id, None) 

1415 

1416 def cleanup_all_worktrees(self) -> None: 

1417 """Clean up all worker worktrees.""" 

1418 worktree_base = self.repo_path / self.parallel_config.worktree_base 

1419 if not worktree_base.exists(): 

1420 return 

1421 

1422 from little_loops.worktree_utils import _is_ll_worktree 

1423 

1424 for worktree_dir in worktree_base.iterdir(): 

1425 if worktree_dir.is_dir() and _is_ll_worktree(worktree_dir.name): 

1426 self._cleanup_worktree(worktree_dir) 

1427 

1428 self.logger.info("Cleaned up all worker worktrees")