Coverage for little_loops / parallel / worker_pool.py: 89%
507 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Worker pool for parallel issue processing with git worktree isolation.
3Each worker operates in an isolated git worktree, allowing concurrent issue
4processing without file conflicts.
5"""
7from __future__ import annotations
9import json
10import os
11import re
12import subprocess
13import sys
14import threading
15import time
16from collections.abc import Callable
17from concurrent.futures import Future, ThreadPoolExecutor
18from datetime import datetime
19from pathlib import Path
20from typing import TYPE_CHECKING, Any, cast
22from little_loops.host_runner import resolve_host
23from little_loops.output_parsing import parse_ready_issue_output
24from little_loops.parallel.git_lock import GitLock
25from little_loops.parallel.types import ParallelConfig, WorkerResult, WorkerStage
26from little_loops.subprocess_utils import (
27 assemble_guillotine_prompt,
28 detect_context_handoff,
29 read_continuation_prompt,
30 read_sentinel,
31 write_sentinel,
32)
33from little_loops.subprocess_utils import (
34 run_claude_command as _run_claude_base,
35)
36from little_loops.work_verification import EXCLUDED_DIRECTORIES, verify_work_was_done
38if TYPE_CHECKING:
39 from little_loops.config import BRConfig
40 from little_loops.issue_parser import IssueInfo
41 from little_loops.logger import Logger
44class WorkerPool:
45 """Thread pool for processing issues in isolated git worktrees.
47 Each worker:
48 1. Creates a dedicated git worktree and branch
49 2. Runs issue validation and implementation via Claude CLI
50 3. Commits changes locally
51 4. Returns results for merge coordination
53 Example:
54 >>> pool = WorkerPool(parallel_config, br_config, logger)
55 >>> pool.start()
56 >>> future = pool.submit(issue_info)
57 >>> result = future.result() # WorkerResult
58 >>> pool.shutdown()
59 """
61 def __init__(
62 self,
63 parallel_config: ParallelConfig,
64 br_config: BRConfig,
65 logger: Logger,
66 repo_path: Path | None = None,
67 git_lock: GitLock | None = None,
68 ) -> None:
69 """Initialize the worker pool.
71 Args:
72 parallel_config: Parallel processing configuration
73 br_config: Project configuration (for category actions)
74 logger: Logger for worker output
75 repo_path: Path to the git repository (default: current directory)
76 git_lock: Shared lock for git operations (created if not provided)
77 """
78 self.parallel_config = parallel_config
79 self.br_config = br_config
80 self.logger = logger
81 self.repo_path = repo_path or Path.cwd()
82 self._git_lock = git_lock or GitLock(logger)
83 self._executor: ThreadPoolExecutor | None = None
84 self._active_workers: dict[str, Future[WorkerResult]] = {}
85 # Track active subprocesses for forceful termination on shutdown
86 self._active_processes: dict[str, subprocess.Popen[str]] = {}
87 # Track active worktree paths to prevent cleanup while in use (BUG-142)
88 self._active_worktrees: set[Path] = set()
89 self._process_lock = threading.Lock()
90 # Track callbacks currently executing
91 self._pending_callbacks: set[str] = set()
92 self._callback_lock = threading.Lock()
93 # Shutdown tracking for interrupted worker detection (ENH-036)
94 self._shutdown_requested = False
95 self._terminated_during_shutdown: set[str] = set()
96 # Track worker processing stages for progress visibility (ENH-262)
97 self._worker_stages: dict[str, WorkerStage] = {}
99 def start(self) -> None:
100 """Start the worker pool."""
101 if self._executor is not None:
102 return
104 # Ensure worktree base directory exists
105 worktree_base = self.repo_path / self.parallel_config.worktree_base
106 worktree_base.mkdir(parents=True, exist_ok=True)
108 self._executor = ThreadPoolExecutor(
109 max_workers=self.parallel_config.max_workers,
110 thread_name_prefix="issue-worker",
111 )
112 self.logger.info(f"Worker pool started with {self.parallel_config.max_workers} workers")
114 def shutdown(self, wait: bool = True) -> None:
115 """Shutdown the worker pool.
117 Args:
118 wait: Whether to wait for pending tasks to complete
119 """
120 if self._executor is None:
121 return
123 self.logger.info("Shutting down worker pool...")
125 # First, terminate all active subprocesses to unblock worker threads
126 if not wait:
127 self.terminate_all_processes()
129 self._executor.shutdown(wait=wait)
130 self._executor = None
132 def set_shutdown_requested(self, value: bool = True) -> None:
133 """Set the shutdown flag.
135 Called by orchestrator during shutdown to enable tracking of
136 workers that are terminated due to shutdown vs. actual failures.
137 """
138 self._shutdown_requested = value
140 def terminate_all_processes(self) -> None:
141 """Forcefully terminate all active subprocesses.
143 Called when we need to abort workers immediately,
144 such as on timeout or shutdown.
145 """
146 with self._process_lock:
147 for issue_id, process in list(self._active_processes.items()):
148 if process.poll() is None: # Still running
149 self.logger.warning(
150 f"Terminating subprocess for {issue_id} (PID {process.pid})"
151 )
152 # Track issues terminated during shutdown for interrupted detection (ENH-036)
153 if self._shutdown_requested:
154 self._terminated_during_shutdown.add(issue_id)
155 try:
156 # Send SIGTERM first for graceful termination
157 process.terminate()
158 try:
159 process.wait(timeout=5)
160 except subprocess.TimeoutExpired:
161 # Force kill if SIGTERM didn't work
162 self.logger.warning(f"Force killing {issue_id} (PID {process.pid})")
163 process.kill()
164 process.wait(timeout=2)
165 except Exception as e:
166 self.logger.error(f"Failed to terminate {issue_id}: {e}")
167 self._active_processes.clear()
169 def submit(
170 self,
171 issue: IssueInfo,
172 on_complete: Callable[[WorkerResult], None] | None = None,
173 ) -> Future[WorkerResult]:
174 """Submit an issue for processing.
176 Args:
177 issue: Issue to process
178 on_complete: Optional callback when processing completes
180 Returns:
181 Future that will contain the WorkerResult
182 """
183 if self._executor is None:
184 raise RuntimeError("Worker pool not started")
186 future = self._executor.submit(self._process_issue, issue)
187 with self._process_lock:
188 self._active_workers[issue.issue_id] = future
190 if on_complete:
191 future.add_done_callback(
192 lambda f: self._handle_completion(f, on_complete, issue.issue_id)
193 )
195 return future
197 def _handle_completion(
198 self,
199 future: Future[WorkerResult],
200 callback: Callable[[WorkerResult], None],
201 issue_id: str,
202 ) -> None:
203 """Handle worker completion and invoke callback."""
204 with self._callback_lock:
205 self._pending_callbacks.add(issue_id)
206 try:
207 try:
208 result = future.result()
209 except Exception as e:
210 self.logger.error(f"Worker future failed for {issue_id}: {e}")
211 result = WorkerResult(
212 issue_id=issue_id,
213 success=False,
214 branch_name="",
215 worktree_path=Path(),
216 error=f"Worker future failed: {e}",
217 )
218 # Set final stage based on result (ENH-262)
219 if result.success:
220 self.set_worker_stage(issue_id, WorkerStage.COMPLETED)
221 elif result.interrupted:
222 self.set_worker_stage(issue_id, WorkerStage.INTERRUPTED)
223 else:
224 self.set_worker_stage(issue_id, WorkerStage.FAILED)
225 try:
226 callback(result)
227 except Exception as e:
228 self.logger.error(f"Worker completion callback failed for {issue_id}: {e}")
229 finally:
230 with self._callback_lock:
231 self._pending_callbacks.discard(issue_id)
233 def _process_issue(self, issue: IssueInfo) -> WorkerResult:
234 """Process a single issue in an isolated worktree.
236 Args:
237 issue: Issue to process
239 Returns:
240 WorkerResult with processing outcome
241 """
242 start_time = time.time()
243 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
244 if self.parallel_config.use_feature_branches:
245 from little_loops.issue_parser import slugify
247 branch_name = f"feature/{issue.issue_id.lower()}-{slugify(issue.title)}"
248 else:
249 branch_name = f"parallel/{issue.issue_id.lower()}-{timestamp}"
250 worktree_path = (
251 self.repo_path
252 / self.parallel_config.worktree_base
253 / f"worker-{issue.issue_id.lower()}-{timestamp}"
254 )
256 # Set initial stage for progress tracking (ENH-262)
257 self.set_worker_stage(issue.issue_id, WorkerStage.SETUP)
259 # Capture baseline of main repo status before worker starts
260 # Used to detect files incorrectly written to main repo
261 baseline_status = self._get_main_repo_baseline()
262 # Capture main HEAD SHA before worker starts to detect committed leaks
263 baseline_head_sha = self._get_main_head_sha()
265 try:
266 # Step 1: Create worktree with new branch
267 self._setup_worktree(worktree_path, branch_name)
269 # Register worktree as active to prevent cleanup while in use (BUG-142)
270 with self._process_lock:
271 self._active_worktrees.add(worktree_path)
273 # Update stage for progress tracking (ENH-262)
274 self.set_worker_stage(issue.issue_id, WorkerStage.VALIDATING)
276 # Step 2: Run ready-issue validation
277 ready_cmd = self.parallel_config.get_ready_command(issue.issue_id)
278 ready_result = self._run_claude_command(
279 ready_cmd,
280 worktree_path,
281 issue_id=issue.issue_id,
282 )
284 # Check if worker was terminated during shutdown (ENH-036)
285 if issue.issue_id in self._terminated_during_shutdown:
286 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED)
287 return WorkerResult(
288 issue_id=issue.issue_id,
289 success=False,
290 interrupted=True,
291 branch_name=branch_name,
292 worktree_path=worktree_path,
293 duration=time.time() - start_time,
294 error="Interrupted during shutdown",
295 stdout=ready_result.stdout,
296 stderr=ready_result.stderr,
297 )
299 if ready_result.returncode != 0:
300 err_detail = ready_result.stderr or (ready_result.stdout or "")[:500]
301 return WorkerResult(
302 issue_id=issue.issue_id,
303 success=False,
304 branch_name=branch_name,
305 worktree_path=worktree_path,
306 duration=time.time() - start_time,
307 error=f"ready-issue failed: {err_detail}",
308 stdout=ready_result.stdout,
309 stderr=ready_result.stderr,
310 )
312 # Step 3: Parse ready-issue output and check verdict
313 ready_parsed = parse_ready_issue_output(ready_result.stdout)
315 # Handle CLOSE verdict - issue should not be implemented
316 if ready_parsed.get("should_close"):
317 return WorkerResult(
318 issue_id=issue.issue_id,
319 success=True, # Closure is a valid outcome
320 branch_name=branch_name,
321 worktree_path=worktree_path,
322 duration=time.time() - start_time,
323 should_close=True,
324 close_reason=ready_parsed.get("close_reason"),
325 close_status=ready_parsed.get("close_status"),
326 stdout=ready_result.stdout,
327 stderr=ready_result.stderr,
328 )
330 # Handle BLOCKED verdict - issue has open dependencies
331 if ready_parsed.get("is_blocked"):
332 return WorkerResult(
333 issue_id=issue.issue_id,
334 success=False,
335 was_blocked=True,
336 branch_name=branch_name,
337 worktree_path=worktree_path,
338 duration=time.time() - start_time,
339 error="ready-issue verdict: BLOCKED - open dependency detected",
340 stdout=ready_result.stdout,
341 stderr=ready_result.stderr,
342 )
344 # Handle NOT_READY verdict
345 if not ready_parsed["is_ready"]:
346 concerns = ready_parsed.get("concerns", [])
347 if concerns:
348 concern_msg = "; ".join(concerns)
349 elif ready_parsed["verdict"] == "UNKNOWN":
350 # For UNKNOWN verdicts, show a snippet of output for debugging
351 raw_out = (ready_result.stdout or "")[:200].strip()
352 concern_msg = (
353 f"Could not parse verdict. Output: {raw_out}..."
354 if raw_out
355 else "No output from ready-issue"
356 )
357 else:
358 concern_msg = "Issue not ready"
359 return WorkerResult(
360 issue_id=issue.issue_id,
361 success=False,
362 branch_name=branch_name,
363 worktree_path=worktree_path,
364 duration=time.time() - start_time,
365 error=f"ready-issue verdict: {ready_parsed['verdict']} - {concern_msg}",
366 stdout=ready_result.stdout,
367 stderr=ready_result.stderr,
368 )
370 # Track if issue was corrected (corrections stay in worktree)
371 was_corrected = ready_parsed.get("was_corrected", False)
372 corrections = ready_parsed.get("corrections", [])
374 # Update stage for progress tracking (ENH-262)
375 self.set_worker_stage(issue.issue_id, WorkerStage.IMPLEMENTING)
377 # Decision gate: invoke decide-issue when the issue requires a decision
378 if issue.decision_needed is True:
379 decide_cmd = self.parallel_config.get_decide_command(issue.issue_id)
380 decide_result = self._run_claude_command(
381 decide_cmd, worktree_path, issue_id=issue.issue_id
382 )
383 if decide_result.returncode != 0:
384 self.logger.warning(
385 f"[{issue.issue_id}] decide-issue command failed, "
386 "continuing to implementation anyway..."
387 )
389 # Step 4: Get action from BRConfig
390 action = self.br_config.get_category_action(issue.issue_type)
392 # Step 5: Run manage-issue implementation (with continuation support)
393 manage_cmd = self.parallel_config.get_manage_command(
394 issue.issue_type, action, issue.issue_id
395 )
396 manage_result = self._run_with_continuation(
397 manage_cmd,
398 worktree_path,
399 issue_id=issue.issue_id,
400 )
402 # Update stage for progress tracking (ENH-262)
403 self.set_worker_stage(issue.issue_id, WorkerStage.VERIFYING)
405 # Check if worker was terminated during shutdown (ENH-036)
406 if issue.issue_id in self._terminated_during_shutdown:
407 self.set_worker_stage(issue.issue_id, WorkerStage.INTERRUPTED)
408 return WorkerResult(
409 issue_id=issue.issue_id,
410 success=False,
411 interrupted=True,
412 branch_name=branch_name,
413 worktree_path=worktree_path,
414 duration=time.time() - start_time,
415 error="Interrupted during shutdown",
416 stdout=manage_result.stdout,
417 stderr=manage_result.stderr,
418 )
420 # Step 6: Get list of changed files in worktree
421 changed_files = self._get_changed_files(worktree_path)
423 # Step 8: Detect files leaked to main repo instead of worktree (unstaged)
424 leaked_files = self._detect_main_repo_leaks(issue.issue_id, baseline_status)
425 if leaked_files:
426 self.logger.warning(
427 f"{issue.issue_id} leaked {len(leaked_files)} file(s) to main repo: "
428 f"{leaked_files}"
429 )
430 # Clean up leaked files to prevent stash conflicts during merge.
431 # The actual work is preserved in the worktree branch.
432 self._cleanup_leaked_files(leaked_files)
434 # Step 8b: Detect commits made directly to main instead of worktree branch.
435 # If Claude committed to main (not the worktree), worktree will have no diff,
436 # causing work verification to fail. Attempt to recover by cherry-picking
437 # the leaked commits to the worktree and resetting main. (BUG-580)
438 committed_leaks = self._detect_committed_leaks(baseline_head_sha)
439 if committed_leaks:
440 self.logger.warning(
441 f"{issue.issue_id} committed {len(committed_leaks)} commit(s) directly "
442 f"to main instead of worktree: {[sha[:8] for sha in committed_leaks]}"
443 )
444 if not changed_files:
445 recovered = self._recover_committed_leaks(
446 committed_leaks, worktree_path, baseline_head_sha, issue.issue_id
447 )
448 if recovered:
449 changed_files = self._get_changed_files(worktree_path)
451 # Step 7: Verify actual work was done (after potential committed-leak recovery)
452 # Pass full filename for better doc-only keyword matching
453 issue_filename = issue.path.stem if issue.path else ""
454 work_verified, verification_error = self._verify_work_was_done(
455 changed_files, issue.issue_id, issue_filename
456 )
458 if manage_result.returncode != 0:
459 err_detail = manage_result.stderr or (manage_result.stdout or "")[:500]
460 return WorkerResult(
461 issue_id=issue.issue_id,
462 success=False,
463 branch_name=branch_name,
464 worktree_path=worktree_path,
465 changed_files=changed_files,
466 leaked_files=leaked_files,
467 duration=time.time() - start_time,
468 error=f"manage-issue failed: {err_detail}",
469 stdout=manage_result.stdout,
470 stderr=manage_result.stderr,
471 )
473 if not work_verified:
474 return WorkerResult(
475 issue_id=issue.issue_id,
476 success=False,
477 branch_name=branch_name,
478 worktree_path=worktree_path,
479 changed_files=changed_files,
480 leaked_files=leaked_files,
481 duration=time.time() - start_time,
482 error=verification_error,
483 stdout=manage_result.stdout,
484 stderr=manage_result.stderr,
485 )
487 # Step 9: Update branch base before merge (BUG-180)
488 # Fetch origin/main and rebase to ensure branch is based on latest main
489 base_updated, base_error = self._update_branch_base(worktree_path, issue.issue_id)
491 # Update stage for progress tracking (ENH-262)
492 self.set_worker_stage(issue.issue_id, WorkerStage.MERGING)
494 if not base_updated:
495 return WorkerResult(
496 issue_id=issue.issue_id,
497 success=False,
498 branch_name=branch_name,
499 worktree_path=worktree_path,
500 changed_files=changed_files,
501 leaked_files=leaked_files,
502 duration=time.time() - start_time,
503 error=base_error,
504 stdout=manage_result.stdout,
505 stderr=manage_result.stderr,
506 )
508 return WorkerResult(
509 issue_id=issue.issue_id,
510 success=True,
511 branch_name=branch_name,
512 worktree_path=worktree_path,
513 changed_files=changed_files,
514 leaked_files=leaked_files,
515 duration=time.time() - start_time,
516 error=None,
517 stdout=manage_result.stdout,
518 stderr=manage_result.stderr,
519 was_corrected=was_corrected,
520 corrections=corrections,
521 )
523 except Exception as e:
524 return WorkerResult(
525 issue_id=issue.issue_id,
526 success=False,
527 branch_name=branch_name,
528 worktree_path=worktree_path,
529 duration=time.time() - start_time,
530 error=str(e),
531 )
532 finally:
533 # Unregister worktree as no longer active (BUG-142)
534 with self._process_lock:
535 self._active_worktrees.discard(worktree_path)
537 def _setup_worktree(self, worktree_path: Path, branch_name: str) -> None:
538 """Create a git worktree with a new branch.
540 Args:
541 worktree_path: Path for the new worktree
542 branch_name: Name of the new branch
543 """
544 from little_loops.worktree_utils import setup_worktree
546 setup_worktree(
547 repo_path=self.repo_path,
548 worktree_path=worktree_path,
549 branch_name=branch_name,
550 copy_files=self.parallel_config.worktree_copy_files,
551 logger=self.logger,
552 git_lock=self._git_lock,
553 )
555 # Verify model if --show-model flag is set (requires API call)
556 if self.parallel_config.show_model:
557 model = self._detect_worktree_model_via_api(worktree_path)
558 if model:
559 self.logger.info(f" Using model: {model}")
560 else:
561 self.logger.warning(" Could not detect Claude CLI model")
563 def _detect_worktree_model_via_api(self, worktree_path: Path) -> str | None:
564 """Detect the model Claude will use by making an API call.
566 Runs a minimal Claude command with JSON output and parses the modelUsage
567 field to verify settings.local.json is being respected.
569 Args:
570 worktree_path: Path to the worktree to test
572 Returns:
573 Model name (e.g., "claude-sonnet-4-20250514") or None if unable to detect
574 """
575 try:
576 invocation = resolve_host().build_blocking_json(prompt="reply with just 'ok'")
577 # No-perm-skip preserved: this is a detection probe, not a real run.
578 args = [a for a in invocation.args if a != "--dangerously-skip-permissions"]
580 # Set environment to keep Claude in the project working directory (BUG-007)
581 # This ensures the first Claude CLI invocation in the worktree has the same
582 # project root behavior as subsequent invocations via run_claude_command()
583 env = os.environ.copy()
584 env["CLAUDE_BASH_MAINTAIN_PROJECT_WORKING_DIR"] = "1"
585 env.update(invocation.env)
587 result = subprocess.run(
588 [invocation.binary, *args],
589 cwd=worktree_path,
590 capture_output=True,
591 text=True,
592 timeout=30,
593 env=env,
594 )
595 if result.returncode == 0 and result.stdout.strip():
596 data: dict[str, Any] = json.loads(result.stdout.strip())
597 model_usage: dict[str, Any] = data.get("modelUsage", {})
598 # Return the first (primary) model from modelUsage
599 if model_usage:
600 return cast(str, next(iter(model_usage.keys())))
601 except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError):
602 pass
603 return None
605 def _cleanup_worktree(self, worktree_path: Path) -> None:
606 """Remove a git worktree and its associated branch.
608 Args:
609 worktree_path: Path to the worktree to remove
610 """
611 if not worktree_path.exists():
612 return
614 # Skip cleanup if worktree is actively in use by a running worker (BUG-142)
615 with self._process_lock:
616 if worktree_path in self._active_worktrees:
617 self.logger.warning(
618 f"Skipping cleanup of {worktree_path.name}: worktree is in active use"
619 )
620 return
622 # Only delete branches with the parallel/ prefix (legacy behavior for ll-parallel)
623 branch_result = subprocess.run(
624 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
625 cwd=worktree_path,
626 capture_output=True,
627 text=True,
628 )
629 branch_name = branch_result.stdout.strip() if branch_result.returncode == 0 else None
630 delete_branch = branch_name is not None and branch_name.startswith("parallel/")
632 from little_loops.worktree_utils import cleanup_worktree
634 cleanup_worktree(
635 worktree_path=worktree_path,
636 repo_path=self.repo_path,
637 logger=self.logger,
638 git_lock=self._git_lock,
639 delete_branch=delete_branch,
640 )
642 def _run_claude_command(
643 self,
644 command: str,
645 working_dir: Path,
646 issue_id: str | None = None,
647 on_usage: Callable[[int, int], None] | None = None,
648 resume_session: bool = False,
649 ) -> subprocess.CompletedProcess[str]:
650 """Run a Claude CLI command with real-time output streaming.
652 Args:
653 command: The command to run (e.g., "/ll:ready-issue BUG-123")
654 working_dir: Directory to run the command in
655 issue_id: Optional issue ID for subprocess tracking
656 on_usage: Optional usage callback for token tracking
657 resume_session: If True, passes --continue to the Claude CLI
659 Returns:
660 CompletedProcess with stdout and stderr
661 """
662 stream_output = self.parallel_config.stream_subprocess_output
664 def stream_callback(line: str, is_stderr: bool) -> None:
665 if stream_output:
666 if is_stderr:
667 print(f" {line}", file=sys.stderr)
668 else:
669 self.logger.info(f" {line}")
671 def on_start(process: subprocess.Popen[str]) -> None:
672 if issue_id:
673 with self._process_lock:
674 self._active_processes[issue_id] = process
676 def on_end(process: subprocess.Popen[str]) -> None:
677 if issue_id:
678 with self._process_lock:
679 self._active_processes.pop(issue_id, None)
681 return _run_claude_base(
682 command=command,
683 timeout=self.parallel_config.timeout_per_issue,
684 working_dir=working_dir,
685 stream_callback=stream_callback if stream_output else None,
686 on_process_start=on_start if issue_id else None,
687 on_process_end=on_end if issue_id else None,
688 idle_timeout=self.parallel_config.idle_timeout_per_issue,
689 on_usage=on_usage,
690 resume_session=resume_session,
691 )
693 def _run_with_continuation(
694 self,
695 command: str,
696 working_dir: Path,
697 issue_id: str | None = None,
698 max_continuations: int = 3,
699 context_limit: int = 200_000,
700 sentinel_threshold: float = 0.60,
701 guillotine_threshold: float = 0.90,
702 ) -> subprocess.CompletedProcess[str]:
703 """Run a Claude command with automatic continuation on context handoff.
705 Mirrors the E+G+J logic in issue_manager.run_with_continuation.
707 Args:
708 command: The command to run
709 working_dir: Directory (worktree) to run the command in
710 issue_id: Optional issue ID for subprocess tracking
711 max_continuations: Maximum number of continuation attempts
712 context_limit: Context window size in tokens
713 sentinel_threshold: Write sentinel when usage >= this fraction
714 guillotine_threshold: Trigger J-path when usage >= this fraction
716 Returns:
717 Combined CompletedProcess with all session outputs
718 """
719 all_stdout: list[str] = []
720 all_stderr: list[str] = []
721 current_command = command
722 continuation_count = 0
723 result: subprocess.CompletedProcess[str] = subprocess.CompletedProcess(
724 args=[], returncode=1, stdout="", stderr=""
725 )
726 tag = f"[{issue_id}]" if issue_id else "[worker]"
728 # Track token usage per-round for sentinel/guillotine thresholds
729 _last_input: list[int] = [0]
730 _last_output: list[int] = [0]
732 def _usage_tracker(input_tokens: int, output_tokens: int) -> None:
733 _last_input[0] = input_tokens
734 _last_output[0] = output_tokens
736 while continuation_count <= max_continuations:
737 result = self._run_claude_command(
738 current_command,
739 working_dir,
740 issue_id=issue_id,
741 on_usage=_usage_tracker,
742 )
744 all_stdout.append(result.stdout)
745 all_stderr.append(result.stderr)
747 # Standard path: Claude emitted CONTEXT_HANDOFF
748 if detect_context_handoff(result.stdout):
749 self.logger.info(f"{tag} Detected CONTEXT_HANDOFF signal")
751 prompt_content = read_continuation_prompt(working_dir)
752 if not prompt_content:
753 self.logger.warning(
754 f"{tag} Context handoff signaled but no continuation prompt found"
755 )
756 all_stderr.append("Handoff detected but no continuation prompt found")
757 result = subprocess.CompletedProcess(
758 args=result.args, returncode=1, stdout=result.stdout, stderr=result.stderr
759 )
760 break
762 if continuation_count >= max_continuations:
763 self.logger.warning(
764 f"{tag} Reached max continuations ({max_continuations}), stopping"
765 )
766 break
768 continuation_count += 1
769 self.logger.info(f"{tag} Starting continuation session #{continuation_count}")
770 current_command = f"{command} --resume"
771 continue
773 total_tokens = _last_input[0] + _last_output[0]
774 usage_ratio = total_tokens / context_limit if context_limit > 0 else 0.0
775 prompt_too_long = "prompt is too long" in (result.stderr or "").lower()
777 # Option J: guillotine — fresh session with transcript-summary prompt
778 if (
779 prompt_too_long or usage_ratio >= guillotine_threshold
780 ) and continuation_count < max_continuations:
781 trigger_reason = (
782 "Prompt is too long" if prompt_too_long else f"usage {usage_ratio:.0%}"
783 )
784 self.logger.warning(
785 f"{tag} Option J triggered ({trigger_reason}): spawning fresh session"
786 )
787 try:
788 guillotine_cmd = assemble_guillotine_prompt(
789 original_command=command,
790 captured_stdout="\n---CONTINUATION---\n".join(all_stdout),
791 token_stats={
792 "input_tokens": _last_input[0],
793 "output_tokens": _last_output[0],
794 "context_limit": context_limit,
795 "trigger_reason": trigger_reason,
796 },
797 )
798 except Exception as exc:
799 self.logger.warning(
800 f"{tag} Failed to assemble guillotine prompt ({exc}), using bare restart"
801 )
802 guillotine_cmd = command
803 continuation_count += 1
804 current_command = guillotine_cmd
805 _last_input[0] = 0
806 _last_output[0] = 0
807 continue
809 # Option E: read sentinel from a PREVIOUS session (must run before G writes
810 # the current-session sentinel to avoid immediately consuming our own write).
811 sentinel_data = read_sentinel(working_dir)
812 if sentinel_data is not None and continuation_count < max_continuations:
813 usage_pct = sentinel_data.get("usage_percent", int(usage_ratio * 100))
814 self.logger.info(
815 f"{tag} Sentinel detected ({usage_pct}% context used): "
816 "sending explicit handoff instruction"
817 )
818 continuation_count += 1
819 explicit_handoff_instruction = (
820 f"Context limit is approaching ({usage_pct}% of the context window is used). "
821 "Please run /ll:handoff RIGHT NOW to save your progress to "
822 ".ll/ll-continue-prompt.md, then output "
823 '"CONTEXT_HANDOFF: Ready for fresh session" to signal continuation.'
824 )
825 _last_input[0] = 0
826 _last_output[0] = 0
827 result = self._run_claude_command(
828 explicit_handoff_instruction,
829 working_dir,
830 issue_id=issue_id,
831 on_usage=_usage_tracker,
832 resume_session=True,
833 )
834 all_stdout.append(result.stdout)
835 all_stderr.append(result.stderr)
837 if detect_context_handoff(result.stdout):
838 self.logger.info(
839 f"{tag} CONTEXT_HANDOFF detected after explicit handoff instruction"
840 )
841 prompt_content = read_continuation_prompt(working_dir)
842 if prompt_content and continuation_count < max_continuations:
843 continuation_count += 1
844 self.logger.info(
845 f"{tag} Starting continuation session #{continuation_count}"
846 )
847 current_command = f"{command} --resume"
848 _last_input[0] = 0
849 _last_output[0] = 0
850 continue
851 break
853 # Option G (Python layer): write sentinel for the NEXT session.
854 # Placed after E-path so we don't immediately consume our own write.
855 if total_tokens > 0 and usage_ratio >= sentinel_threshold:
856 self.logger.info(
857 f"{tag} Writing context-handoff sentinel ({usage_ratio:.0%} context used)"
858 )
859 write_sentinel(working_dir, token_count=total_tokens, context_limit=context_limit)
861 # No handoff signal, no prior-session sentinel, no overflow — done
862 break
864 return subprocess.CompletedProcess(
865 args=result.args,
866 returncode=result.returncode,
867 stdout="\n---CONTINUATION---\n".join(all_stdout),
868 stderr="\n---CONTINUATION---\n".join(all_stderr),
869 )
871 def _get_changed_files(self, worktree_path: Path) -> list[str]:
872 """Get list of files changed in the worktree.
874 Args:
875 worktree_path: Path to the worktree
877 Returns:
878 List of changed file paths relative to repo root
879 """
880 result = subprocess.run(
881 ["git", "diff", "--name-only", self.parallel_config.base_branch, "HEAD"],
882 cwd=worktree_path,
883 capture_output=True,
884 text=True,
885 timeout=30,
886 )
888 if result.returncode != 0:
889 return []
891 return [f.strip() for f in result.stdout.strip().split("\n") if f.strip()]
893 def _update_branch_base(self, worktree_path: Path, issue_id: str) -> tuple[bool, str]:
894 """Fetch origin/main and rebase worker branch onto it.
896 This ensures the worker branch is based on the latest main before
897 merge coordination, preventing conflicts when main advances during
898 sprint execution (BUG-180).
900 Args:
901 worktree_path: Path to the worker's worktree
902 issue_id: Issue ID for logging
904 Returns:
905 Tuple of (success, error_message)
906 """
907 # Fetch latest base branch from configured remote (fall back to local if fetch fails)
908 base = self.parallel_config.base_branch
909 remote = self.parallel_config.remote_name
910 fetch_result = subprocess.run(
911 ["git", "fetch", remote, base],
912 cwd=worktree_path,
913 capture_output=True,
914 text=True,
915 timeout=60,
916 )
918 rebase_target = f"{remote}/{base}" if fetch_result.returncode == 0 else base
920 # Rebase current branch onto base (remote or local fallback)
921 rebase_result = subprocess.run(
922 ["git", "rebase", rebase_target],
923 cwd=worktree_path,
924 capture_output=True,
925 text=True,
926 timeout=120,
927 )
929 if rebase_result.returncode != 0:
930 # Abort the failed rebase
931 subprocess.run(
932 ["git", "rebase", "--abort"],
933 cwd=worktree_path,
934 capture_output=True,
935 timeout=10,
936 )
937 return False, f"Failed to rebase onto {rebase_target}: {rebase_result.stderr}"
939 self.logger.info(f"[{issue_id}] Rebased branch onto {rebase_target}")
940 return True, ""
942 def _verify_work_was_done(
943 self, changed_files: list[str], issue_id: str, issue_filename: str = ""
944 ) -> tuple[bool, str]:
945 """Verify that actual implementation work was done.
947 Uses the shared verify_work_was_done() function to check that changed
948 files include meaningful work, not just issue files or other artifacts.
950 Args:
951 changed_files: List of files changed during processing
952 issue_id: The issue ID being processed (unused, kept for compatibility)
953 issue_filename: Full issue filename (unused, kept for compatibility)
955 Returns:
956 Tuple of (success, error_message)
957 """
958 if not changed_files:
959 return False, "No files were changed during implementation"
961 # Check if code changes are required
962 if not self.parallel_config.require_code_changes:
963 return True, ""
965 # Use shared verification function
966 if verify_work_was_done(self.logger, changed_files):
967 return True, ""
969 # Generate descriptive error with actual excluded files
970 excluded_files = [
971 f
972 for f in changed_files
973 if f and any(f.startswith(excl) for excl in EXCLUDED_DIRECTORIES)
974 ]
975 if excluded_files:
976 files_preview = ", ".join(excluded_files[:5])
977 if len(excluded_files) > 5:
978 files_preview += f" (+{len(excluded_files) - 5} more)"
979 return False, f"Only excluded files modified: {files_preview}"
980 return False, "Only excluded files modified (e.g., .issues/, thoughts/)"
982 def _has_other_issue_id(self, file_lower: str, current_issue_id_lower: str) -> bool:
983 """Check if file contains a different issue ID than the current worker's.
985 This prevents cross-worker contamination where worker A detects worker B's
986 leaked file. When multiple workers run in parallel, their leaked files may
987 both appear in the main repo. Each worker should only clean up its own leaks.
989 Args:
990 file_lower: Lowercase file path to check
991 current_issue_id_lower: Lowercase issue ID of the current worker
993 Returns:
994 True if the file contains a different issue ID (belongs to another worker),
995 False if the file contains the current issue ID or no recognizable issue ID
996 """
997 # Pattern matches common issue ID formats: BUG-123, ENH-456, FEAT-789, EPIC-001
998 # Use non-capturing group (?:...) so findall returns full match, not group
999 matches = re.findall(r"(?:bug|enh|feat|epic)-\d+", file_lower)
1001 if not matches:
1002 # No issue ID found - file doesn't belong to any specific worker
1003 return False
1005 # Check if any of the found issue IDs match the current worker
1006 for match in matches:
1007 if match == current_issue_id_lower:
1008 return False # File belongs to current worker
1010 # File has issue ID(s) but none match current worker - belongs to another worker
1011 return True
1013 def _detect_main_repo_leaks(self, issue_id: str, baseline_status: set[str]) -> list[str]:
1014 """Detect files incorrectly written to main repo instead of worktree.
1016 Claude Code may write files to the main repository instead of the
1017 worktree due to project root detection issues (see GitHub #8771).
1018 This method detects such leaks by comparing main repo status before
1019 and after worker execution.
1021 Args:
1022 issue_id: ID of the issue being processed (for pattern matching)
1023 baseline_status: Set of file paths from git status before worker started
1025 Returns:
1026 List of file paths that were leaked to main repo
1027 """
1028 # Get current status of main repo
1029 result = self._git_lock.run(
1030 ["status", "--porcelain"],
1031 cwd=self.repo_path,
1032 timeout=30,
1033 )
1035 if result.returncode != 0:
1036 return []
1038 current_files: set[str] = set()
1039 for line in result.stdout.strip().split("\n"):
1040 if not line or len(line) < 3:
1041 continue
1042 # Extract file path (after status codes and space)
1043 file_path = line[3:].strip()
1044 # Handle renamed files (old -> new)
1045 if " -> " in file_path:
1046 file_path = file_path.split(" -> ")[-1]
1047 current_files.add(file_path)
1049 # Find new files that appeared during worker execution
1050 new_files = current_files - baseline_status
1052 # Filter to files likely related to this issue
1053 issue_id_lower = issue_id.lower()
1054 leaked_files: list[str] = []
1056 # Build source prefix list: start with common fallbacks, then add configured dirs
1057 source_prefixes = ["backend/", "src/", "lib/", "tests/"]
1058 for dir_path in [self.br_config.project.src_dir, self.br_config.project.test_dir]:
1059 if dir_path:
1060 normalized = dir_path.rstrip("/") + "/"
1061 if normalized not in source_prefixes:
1062 source_prefixes.append(normalized)
1064 for file_path in new_files:
1065 # Skip state file (managed by orchestrator)
1066 if file_path.endswith(".parallel-manage-state.json"):
1067 continue
1068 # Skip .gitignore (may be modified by ll-parallel)
1069 if file_path == ".gitignore":
1070 continue
1072 # Check if file is related to this issue
1073 file_lower = file_path.lower()
1074 if issue_id_lower in file_lower:
1075 leaked_files.append(file_path)
1076 # Also catch source files that shouldn't be modified in main
1077 elif file_path.startswith(tuple(source_prefixes)):
1078 leaked_files.append(file_path)
1079 # Catch thoughts/plans files
1080 elif file_path.startswith("thoughts/"):
1081 leaked_files.append(file_path)
1082 # Catch issue files in any issue directory variant
1083 # Handles both .issues/ (with dot) and issues/ (without dot)
1084 # Only include files without a different issue ID - files WITH other issue IDs
1085 # belong to other workers running in parallel (cross-worker contamination)
1086 elif file_path.startswith((".issues/", "issues/")):
1087 if not self._has_other_issue_id(file_lower, issue_id_lower):
1088 leaked_files.append(file_path)
1090 return leaked_files
1092 def _cleanup_leaked_files(self, leaked_files: list[str]) -> int:
1093 """Discard leaked files from main repo working directory.
1095 Claude Code sometimes writes files to the main repo instead of the
1096 worktree. These files cause stash conflicts during merge operations.
1097 Since the actual work is preserved in the worktree branch, we can
1098 safely discard these leaked changes from the main repo.
1100 Args:
1101 leaked_files: List of file paths leaked to main repo
1103 Returns:
1104 Number of files successfully cleaned up
1105 """
1106 if not leaked_files:
1107 return 0
1109 cleaned = 0
1111 # Get status to determine which files are tracked vs untracked
1112 status_result = self._git_lock.run(
1113 ["status", "--porcelain", "--"] + leaked_files,
1114 cwd=self.repo_path,
1115 timeout=30,
1116 )
1118 tracked_files: list[str] = []
1119 untracked_files: list[str] = []
1121 for line in status_result.stdout.splitlines():
1122 if not line or len(line) < 3:
1123 continue
1124 status_code = line[:2]
1125 file_path = line[3:].split(" -> ")[-1].strip()
1127 if status_code.startswith("?"):
1128 # Untracked file - need to delete
1129 untracked_files.append(file_path)
1130 else:
1131 # Tracked file - can use git checkout to discard
1132 tracked_files.append(file_path)
1134 # Discard changes to tracked files
1135 if tracked_files:
1136 checkout_result = self._git_lock.run(
1137 ["checkout", "--"] + tracked_files,
1138 cwd=self.repo_path,
1139 timeout=30,
1140 )
1141 if checkout_result.returncode == 0:
1142 cleaned += len(tracked_files)
1143 else:
1144 self.logger.warning(
1145 f"Failed to discard tracked leaked files: {checkout_result.stderr}"
1146 )
1148 # Delete untracked files
1149 for file_path in untracked_files:
1150 full_path = self.repo_path / file_path
1151 try:
1152 if full_path.exists():
1153 full_path.unlink()
1154 cleaned += 1
1155 except OSError as e:
1156 self.logger.warning(f"Failed to delete leaked file {file_path}: {e}")
1158 # Fallback: directly delete files not reported by git status
1159 # This handles gitignored files that git status --porcelain doesn't show
1160 accounted_files = set(tracked_files + untracked_files)
1161 for file_path in leaked_files:
1162 if file_path not in accounted_files:
1163 full_path = self.repo_path / file_path
1164 if full_path.exists():
1165 try:
1166 full_path.unlink()
1167 cleaned += 1
1168 self.logger.info(f"Deleted gitignored leaked file: {file_path}")
1169 except OSError as e:
1170 self.logger.warning(
1171 f"Failed to delete gitignored leaked file {file_path}: {e}"
1172 )
1173 else:
1174 self.logger.debug(f"Leaked file not found (may have been moved): {file_path}")
1176 if cleaned > 0:
1177 self.logger.info(f"Cleaned up {cleaned} leaked file(s) from main repo")
1179 return cleaned
1181 def _get_main_repo_baseline(self) -> set[str]:
1182 """Get baseline of modified/untracked files in main repo.
1184 Returns:
1185 Set of file paths currently showing in git status
1186 """
1187 result = self._git_lock.run(
1188 ["status", "--porcelain"],
1189 cwd=self.repo_path,
1190 timeout=30,
1191 )
1193 if result.returncode != 0:
1194 return set()
1196 files: set[str] = set()
1197 for line in result.stdout.strip().split("\n"):
1198 if not line or len(line) < 3:
1199 continue
1200 file_path = line[3:].strip()
1201 if " -> " in file_path:
1202 file_path = file_path.split(" -> ")[-1]
1203 files.add(file_path)
1205 return files
1207 def _get_main_head_sha(self) -> str:
1208 """Get the current HEAD SHA of the main repo.
1210 Returns:
1211 HEAD SHA string, or empty string if unavailable
1212 """
1213 result = self._git_lock.run(
1214 ["rev-parse", "HEAD"],
1215 cwd=self.repo_path,
1216 timeout=10,
1217 )
1218 if result.returncode == 0:
1219 return result.stdout.strip()
1220 return ""
1222 def _detect_committed_leaks(self, baseline_head_sha: str) -> list[str]:
1223 """Detect commits made directly to main repo during worker execution.
1225 When Claude commits to the main repo instead of the worktree branch,
1226 the commits appear on main's history but the worktree has no changes.
1227 This method detects such leaked commits by comparing main's HEAD SHA
1228 before and after worker execution.
1230 Args:
1231 baseline_head_sha: HEAD SHA captured before worker started
1233 Returns:
1234 List of commit SHAs committed to main during worker execution,
1235 newest first. Empty list if no committed leaks detected.
1236 """
1237 if not baseline_head_sha:
1238 return []
1240 current_sha = self._get_main_head_sha()
1241 if not current_sha or current_sha == baseline_head_sha:
1242 return []
1244 # Get list of new commits on main since baseline
1245 result = self._git_lock.run(
1246 ["log", "--format=%H", f"{baseline_head_sha}..HEAD"],
1247 cwd=self.repo_path,
1248 timeout=30,
1249 )
1250 if result.returncode != 0:
1251 return []
1253 commits = [sha.strip() for sha in result.stdout.strip().split("\n") if sha.strip()]
1254 return commits
1256 def _recover_committed_leaks(
1257 self,
1258 leaked_commits: list[str],
1259 worktree_path: Path,
1260 baseline_head_sha: str,
1261 issue_id: str,
1262 ) -> bool:
1263 """Attempt to recover committed leaks by cherry-picking to worktree.
1265 When Claude commits directly to main instead of the worktree branch,
1266 we attempt to:
1267 1. Cherry-pick the leaked commits onto the worktree branch
1268 2. Reset main back to the baseline SHA (if safe to do so)
1270 This preserves the implementation work in the worktree while
1271 cleaning up the incorrect commits on main.
1273 Args:
1274 leaked_commits: Commit SHAs that leaked to main (newest first)
1275 worktree_path: Path to the worker's worktree
1276 baseline_head_sha: Main HEAD SHA before worker started
1277 issue_id: Issue ID for logging
1279 Returns:
1280 True if cherry-pick succeeded (main reset is attempted but
1281 not required for a True return value)
1282 """
1283 self.logger.info(
1284 f"[{issue_id}] Attempting recovery: cherry-picking {len(leaked_commits)} "
1285 f"commit(s) to worktree"
1286 )
1288 # Cherry-pick in chronological order (oldest first = reverse of log output)
1289 for sha in reversed(leaked_commits):
1290 result = subprocess.run(
1291 ["git", "cherry-pick", sha],
1292 cwd=worktree_path,
1293 capture_output=True,
1294 text=True,
1295 timeout=60,
1296 )
1297 if result.returncode != 0:
1298 subprocess.run(
1299 ["git", "cherry-pick", "--abort"],
1300 cwd=worktree_path,
1301 capture_output=True,
1302 timeout=10,
1303 )
1304 self.logger.warning(
1305 f"[{issue_id}] Cherry-pick of {sha[:8]} failed: {result.stderr.strip()}"
1306 )
1307 return False
1309 # Attempt to reset main to baseline (only if main hasn't advanced further)
1310 current_main_sha = self._get_main_head_sha()
1311 most_recent_leaked = leaked_commits[0] # Newest first
1312 if current_main_sha == most_recent_leaked:
1313 reset_result = self._git_lock.run(
1314 ["reset", "--hard", baseline_head_sha],
1315 cwd=self.repo_path,
1316 timeout=30,
1317 )
1318 if reset_result.returncode == 0:
1319 self.logger.info(f"[{issue_id}] Reset main to baseline {baseline_head_sha[:8]}")
1320 else:
1321 self.logger.warning(
1322 f"[{issue_id}] Cherry-pick succeeded but failed to reset main: "
1323 f"{reset_result.stderr.strip()}"
1324 )
1325 else:
1326 # main has advanced past the leaked commits — attempt surgical rebase
1327 # to excise only the leaked commits while preserving subsequent work
1328 self.logger.info(
1329 f"[{issue_id}] Main has advanced beyond leaked commits "
1330 f"({current_main_sha[:8]} != {most_recent_leaked[:8]}) — "
1331 f"attempting surgical rebase to excise leaked commits"
1332 )
1333 rebase_result = self._git_lock.run(
1334 ["rebase", "--onto", baseline_head_sha, most_recent_leaked],
1335 cwd=self.repo_path,
1336 timeout=60,
1337 )
1338 if rebase_result.returncode == 0:
1339 self.logger.info(f"[{issue_id}] Surgically removed leaked commits via rebase")
1340 else:
1341 self._git_lock.run(
1342 ["rebase", "--abort"],
1343 cwd=self.repo_path,
1344 timeout=10,
1345 )
1346 self.logger.warning(
1347 f"[{issue_id}] Surgical rebase failed — manual cleanup required: "
1348 f"{rebase_result.stderr.strip()}"
1349 )
1351 self.logger.info(
1352 f"[{issue_id}] Recovered {len(leaked_commits)} commit(s): "
1353 f"cherry-picked to worktree branch"
1354 )
1355 return True
1357 @property
1358 def active_count(self) -> int:
1359 """Number of currently active workers.
1361 Includes both workers with running futures AND workers whose futures
1362 are done but callbacks haven't completed yet.
1363 """
1364 with self._process_lock:
1365 running_futures = sum(1 for f in self._active_workers.values() if not f.done())
1366 with self._callback_lock:
1367 pending_callback_count = len(self._pending_callbacks)
1368 return running_futures + pending_callback_count
1370 def set_worker_stage(self, issue_id: str, stage: WorkerStage) -> None:
1371 """Update the stage of a worker.
1373 Args:
1374 issue_id: Issue ID being processed
1375 stage: New stage value
1376 """
1377 with self._process_lock:
1378 self._worker_stages[issue_id] = stage
1380 def get_worker_stage(self, issue_id: str) -> WorkerStage | None:
1381 """Get the current stage of a worker.
1383 Args:
1384 issue_id: Issue ID being processed
1386 Returns:
1387 Current stage, or None if issue not being tracked
1388 """
1389 with self._process_lock:
1390 return self._worker_stages.get(issue_id)
1392 def get_active_stages(self) -> dict[str, WorkerStage]:
1393 """Get all active worker stages.
1395 Returns:
1396 Dictionary mapping issue_id to current stage for active workers
1397 """
1398 with self._process_lock:
1399 # Only return workers that are actually active
1400 active_ids = set(self._active_workers.keys())
1401 return {
1402 issue_id: stage
1403 for issue_id, stage in self._worker_stages.items()
1404 if issue_id in active_ids
1405 }
1407 def remove_worker_stage(self, issue_id: str) -> None:
1408 """Remove a worker from stage tracking.
1410 Args:
1411 issue_id: Issue ID to remove
1412 """
1413 with self._process_lock:
1414 self._worker_stages.pop(issue_id, None)
1416 def cleanup_all_worktrees(self) -> None:
1417 """Clean up all worker worktrees."""
1418 worktree_base = self.repo_path / self.parallel_config.worktree_base
1419 if not worktree_base.exists():
1420 return
1422 from little_loops.worktree_utils import _is_ll_worktree
1424 for worktree_dir in worktree_base.iterdir():
1425 if worktree_dir.is_dir() and _is_ll_worktree(worktree_dir.name):
1426 self._cleanup_worktree(worktree_dir)
1428 self.logger.info("Cleaned up all worker worktrees")