Coverage for little_loops / cli / sprint / run.py: 86%
318 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-sprint run subcommand with signal handling and state management."""
3from __future__ import annotations
5import os
6import signal
7import subprocess
8import sys
9from pathlib import Path
10from types import FrameType
11from typing import TYPE_CHECKING
13from little_loops.cli.output import use_color_enabled
14from little_loops.cli.sprint._helpers import _build_issue_contents, _render_dependency_analysis
15from little_loops.cli_args import _id_matches, parse_issue_ids, parse_issue_types, parse_labels
16from little_loops.dependency_graph import DependencyGraph, refine_waves_for_contention
17from little_loops.logger import Logger, format_duration
18from little_loops.parallel.orchestrator import ParallelOrchestrator
19from little_loops.sprint import SprintManager, SprintState
21if TYPE_CHECKING:
22 import argparse
24 from little_loops.config import BRConfig
26# Module-level shutdown flag for ll-sprint signal handling (ENH-183)
27_sprint_shutdown_requested: bool = False
28_sprint_logger: Logger | None = None
31def _sprint_signal_handler(signum: int, frame: FrameType | None) -> None:
32 """Handle shutdown signals gracefully for ll-sprint.
34 First signal: Set shutdown flag for graceful exit after current wave.
35 Second signal: Force immediate exit.
36 """
37 global _sprint_shutdown_requested
38 if _sprint_shutdown_requested:
39 # Second signal - force exit
40 msg = "\nForce shutdown requested"
41 if _sprint_logger is not None:
42 _sprint_logger.warning(msg)
43 else:
44 print(msg, file=sys.stderr)
45 sys.exit(1)
46 _sprint_shutdown_requested = True
47 msg = "\nShutdown requested, will exit after current wave..."
48 if _sprint_logger is not None:
49 _sprint_logger.warning(msg)
50 else:
51 print(msg, file=sys.stderr)
54def _get_sprint_state_file() -> Path:
55 """Get path to sprint state file."""
56 return Path.cwd() / ".sprint-state.json"
59def _load_sprint_state(logger: Logger) -> SprintState | None:
60 """Load sprint state from file."""
61 import json
63 state_file = _get_sprint_state_file()
64 if not state_file.exists():
65 return None
66 try:
67 data = json.loads(state_file.read_text())
68 state = SprintState.from_dict(data)
69 logger.info(f"State loaded from {state_file}")
70 return state
71 except (json.JSONDecodeError, KeyError) as e:
72 logger.warning(f"Failed to load state: {e}")
73 return None
76def _save_sprint_state(state: SprintState, logger: Logger) -> None:
77 """Save sprint state to file."""
78 import json
79 from datetime import datetime
81 state.last_checkpoint = datetime.now().isoformat()
82 state_file = _get_sprint_state_file()
83 state_file.write_text(json.dumps(state.to_dict(), indent=2))
84 logger.info(f"State saved to {state_file}")
87def _cleanup_sprint_state(logger: Logger) -> None:
88 """Remove sprint state file."""
89 state_file = _get_sprint_state_file()
90 if state_file.exists():
91 state_file.unlink()
92 logger.info("Sprint state file cleaned up")
95def _cmd_sprint_run(
96 args: argparse.Namespace,
97 manager: SprintManager,
98 config: BRConfig,
99) -> int:
100 """Execute a sprint with dependency-aware scheduling."""
101 from datetime import datetime
103 global _sprint_logger
104 logger = Logger(verbose=not args.quiet, use_color=use_color_enabled())
105 _sprint_logger = logger
107 # Setup signal handlers for graceful shutdown (ENH-183)
108 global _sprint_shutdown_requested
109 _sprint_shutdown_requested = False # Reset in case of multiple runs
110 signal.signal(signal.SIGINT, _sprint_signal_handler)
111 signal.signal(signal.SIGTERM, _sprint_signal_handler)
113 handoff_threshold = getattr(args, "handoff_threshold", None)
114 if handoff_threshold is not None:
115 os.environ["LL_HANDOFF_THRESHOLD"] = str(handoff_threshold)
117 context_limit = getattr(args, "context_limit", None)
118 if context_limit is not None:
119 os.environ["LL_CONTEXT_LIMIT"] = str(context_limit)
121 sprint = manager.load(args.sprint)
122 if not sprint:
123 logger.error(f"Sprint not found: {args.sprint}")
124 return 1
126 # Apply skip filter if provided
127 issues_to_process = list(sprint.issues)
128 skip_ids = parse_issue_ids(args.skip)
129 if skip_ids:
130 original_count = len(issues_to_process)
131 issues_to_process = [i for i in issues_to_process if i not in skip_ids]
132 skipped = original_count - len(issues_to_process)
133 if skipped > 0:
134 logger.info(f"Skipping {skipped} issue(s): {', '.join(sorted(skip_ids))}")
136 # Apply only filter if provided
137 only_ids = parse_issue_ids(getattr(args, "only", None))
138 if only_ids:
139 invalid_only = {p for p in only_ids if not any(_id_matches(i, p) for i in sprint.issues)}
140 if invalid_only:
141 logger.error(
142 f"Issue(s) not found in sprint definition: {', '.join(sorted(invalid_only))}"
143 )
144 return 1
145 issues_to_process = [
146 i for i in issues_to_process if any(_id_matches(i, p) for p in only_ids)
147 ]
148 logger.info(
149 f"Processing only {len(issues_to_process)} issue(s): {', '.join(sorted(only_ids))}"
150 )
152 # Apply type filter if provided
153 type_prefixes = parse_issue_types(getattr(args, "type", None))
154 if type_prefixes:
155 original_count = len(issues_to_process)
156 issues_to_process = [i for i in issues_to_process if i.split("-", 1)[0] in type_prefixes]
157 filtered = original_count - len(issues_to_process)
158 if filtered > 0:
159 logger.info(f"Filtered {filtered} issue(s) by type: {', '.join(sorted(type_prefixes))}")
161 # Pre-validate: skip issues with status: done/cancelled (ENH-1424)
162 pre_completed_skipped: list[str] = []
164 # Validate issues exist
165 valid = manager.validate_issues(issues_to_process)
166 invalid = set(issues_to_process) - set(valid.keys())
168 if invalid:
169 logger.error(f"Issue IDs not found: {', '.join(sorted(invalid))}")
170 logger.info("Cannot execute sprint with missing issues")
171 return 1
173 # Write milestone: field back to issue files for all valid issues in this sprint
174 from little_loops.frontmatter import parse_frontmatter, update_frontmatter
176 for _issue_id, path in valid.items():
177 content = path.read_text(encoding="utf-8")
178 updated = update_frontmatter(content, {"milestone": sprint.name})
179 if updated != content:
180 path.write_text(updated, encoding="utf-8")
181 logger.info(f"Set milestone: {sprint.name!r} on {len(valid)} issue file(s)")
183 # Skip issues already completed via frontmatter status (ENH-1424)
184 still_active: list[str] = []
185 for issue_id in issues_to_process:
186 path = valid[issue_id]
187 fm = parse_frontmatter(path.read_text(encoding="utf-8"))
188 if fm.get("status", "open") in ("done", "cancelled"):
189 logger.info(f" {issue_id}: already completed (status: {fm.get('status')}), skipping")
190 pre_completed_skipped.append(issue_id)
191 else:
192 still_active.append(issue_id)
193 if pre_completed_skipped:
194 logger.info(
195 f"Pre-validation: {len(pre_completed_skipped)} issue(s) already completed, "
196 f"{len(still_active)} active"
197 )
198 issues_to_process = still_active
200 if pre_completed_skipped and not issues_to_process:
201 logger.info("All sprint issues already completed - nothing to process")
202 return 0
204 # Load full IssueInfo objects for dependency analysis
205 issue_infos = manager.load_issue_infos(issues_to_process)
206 if not issue_infos:
207 logger.error("No issue files found")
208 return 1
210 # Apply label filter if provided (must run after IssueInfo load since labels come from frontmatter)
211 label_filter = parse_labels(getattr(args, "label", None))
212 if label_filter:
213 original_count = len(issue_infos)
214 issue_infos = [i for i in issue_infos if any(lb.lower() in label_filter for lb in i.labels)]
215 filtered = original_count - len(issue_infos)
216 if filtered > 0:
217 logger.info(f"Filtered {filtered} issue(s) by label: {', '.join(sorted(label_filter))}")
218 issues_to_process = [i.issue_id for i in issue_infos]
220 # Gather all issue IDs on disk to avoid false "nonexistent" warnings
221 from little_loops.dependency_mapper import gather_all_issue_ids
223 issues_dir = config.project_root / config.issues.base_dir
224 all_known_ids = gather_all_issue_ids(issues_dir, config=config)
226 # Dependency analysis (ENH-301)
227 if not getattr(args, "skip_analysis", False):
228 from little_loops.dependency_mapper import analyze_dependencies
230 issue_contents = _build_issue_contents(issue_infos)
231 dep_config = config.dependency_mapping
232 dep_report = analyze_dependencies(
233 issue_infos, issue_contents, all_known_ids=all_known_ids, config=dep_config
234 )
235 _render_dependency_analysis(dep_report, logger, config=dep_config)
237 # Build dependency graph
238 dep_graph = DependencyGraph.from_issues(issue_infos, all_known_ids=all_known_ids)
240 # Detect cycles
241 if dep_graph.has_cycles():
242 cycles = dep_graph.detect_cycles()
243 for cycle in cycles:
244 logger.error(f"Dependency cycle detected: {' -> '.join(cycle)}")
245 return 1
247 # Get execution waves
248 try:
249 waves = dep_graph.get_execution_waves()
250 except ValueError as e:
251 logger.error(str(e))
252 return 1
254 # Refine waves for file overlap (ENH-306)
255 waves, contention_notes = refine_waves_for_contention(waves, config=config.dependency_mapping)
257 # Display execution plan
258 logger.info(f"Running sprint: {sprint.name}")
259 logger.info("Dependency analysis:")
260 for i, wave in enumerate(waves, 1):
261 issue_ids = ", ".join(issue.issue_id for issue in wave)
262 note = contention_notes[i - 1] if contention_notes else None
263 if note:
264 logger.info(
265 f" Wave {i}: {issue_ids}"
266 f" [sub-wave {note.sub_wave_index + 1}/{note.total_sub_waves}]"
267 )
268 else:
269 logger.info(f" Wave {i}: {issue_ids}")
271 if args.dry_run:
272 logger.info("\nDry run mode - no changes will be made")
273 return 0
275 # Initialize or load state
276 state: SprintState
277 start_wave = 1
279 if args.resume:
280 loaded_state = _load_sprint_state(logger)
281 if loaded_state and loaded_state.sprint_name == args.sprint:
282 state = loaded_state
283 # Find first incomplete wave by checking completed issues
284 completed_set = set(state.completed_issues)
285 for i, wave in enumerate(waves, 1):
286 wave_issue_ids = {issue.issue_id for issue in wave}
287 if not wave_issue_ids.issubset(completed_set):
288 start_wave = i
289 break
290 else:
291 # All waves completed
292 logger.info("Sprint already completed - nothing to resume")
293 _cleanup_sprint_state(logger)
294 return 0
295 logger.info(f"Resuming from wave {start_wave}/{len(waves)}")
296 logger.info(f" Previously completed: {len(state.completed_issues)} issues")
297 else:
298 if loaded_state:
299 logger.warning(
300 f"State file is for sprint '{loaded_state.sprint_name}', "
301 f"not '{args.sprint}' - starting fresh"
302 )
303 else:
304 logger.warning("No valid state found - starting fresh")
305 state = SprintState(
306 sprint_name=args.sprint,
307 started_at=datetime.now().isoformat(),
308 )
309 else:
310 # Fresh start - delete any old state
311 _cleanup_sprint_state(logger)
312 state = SprintState(
313 sprint_name=args.sprint,
314 started_at=datetime.now().isoformat(),
315 )
317 # Track exit status for error handling (ENH-185)
318 exit_code = 0
320 try:
321 # Determine max workers
322 max_workers = args.max_workers or (sprint.options.max_workers if sprint.options else 2)
324 # Execute wave by wave
325 completed: set[str] = set(state.completed_issues)
326 failed_waves = 0
327 total_duration = 0.0
328 total_waves = len(waves)
330 for wave_num, wave in enumerate(waves, 1):
331 # Check for shutdown request (ENH-183)
332 if _sprint_shutdown_requested:
333 logger.warning("Shutdown requested - saving state and exiting")
334 _save_sprint_state(state, logger)
335 exit_code = 1
336 return exit_code
338 # Skip already-completed waves when resuming
339 if wave_num < start_wave:
340 continue
342 wave_ids = [issue.issue_id for issue in wave]
343 state.current_wave = wave_num
344 logger.info(f"\nProcessing wave {wave_num}/{total_waves}: {', '.join(wave_ids)}")
346 wave_note = (
347 contention_notes[wave_num - 1]
348 if contention_notes and wave_num - 1 < len(contention_notes)
349 else None
350 )
351 is_contention_subwave = wave_note is not None
353 if len(wave) == 1 or is_contention_subwave:
354 # Single issue OR contention sub-wave — process in-place sequentially
355 # (contention sub-waves are displayed as "serialized steps" so must run that way)
356 from little_loops.issue_manager import process_issue_inplace
358 wave_failed = False
359 for issue in wave:
360 issue_result = process_issue_inplace(
361 info=issue,
362 config=config,
363 logger=logger,
364 dry_run=args.dry_run,
365 )
366 total_duration += issue_result.duration
367 if issue_result.success:
368 completed.add(issue.issue_id)
369 state.completed_issues.append(issue.issue_id)
370 state.timing[issue.issue_id] = {"total": issue_result.duration}
371 logger.success(f" {issue.issue_id}: completed")
372 elif issue_result.was_blocked:
373 completed.add(issue.issue_id)
374 state.skipped_blocked_issues[issue.issue_id] = issue_result.failure_reason
375 logger.warning(f" {issue.issue_id}: skipped (blocked by open dependency)")
376 else:
377 wave_failed = True
378 completed.add(issue.issue_id)
379 state.failed_issues[issue.issue_id] = "Issue processing failed"
380 logger.warning(f" {issue.issue_id}: failed")
381 if wave_failed:
382 failed_waves += 1
383 logger.warning(f"Wave {wave_num}/{total_waves} had failures")
384 else:
385 logger.success(
386 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}"
387 )
388 _save_sprint_state(state, logger)
389 if wave_num < total_waves:
390 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...")
391 # Check for shutdown before next wave (ENH-183)
392 if _sprint_shutdown_requested:
393 logger.warning("Shutdown requested - exiting after wave completion")
394 exit_code = 1
395 return exit_code
396 else:
397 # Multi-issue — use ParallelOrchestrator with worktrees
398 only_ids = set(wave_ids)
399 # Detect current branch for rebase/merge operations (BUG-439)
400 _br = subprocess.run(
401 ["git", "rev-parse", "--abbrev-ref", "HEAD"],
402 capture_output=True,
403 text=True,
404 cwd=Path.cwd(),
405 )
406 _base_branch = _br.stdout.strip() if _br.returncode == 0 else "main"
407 # Runtime overlap detection disabled for sprints (ENH-512):
408 # refine_waves_for_contention() already splits overlapping
409 # issues into separate sub-waves before dispatch.
410 parallel_config = config.create_parallel_config(
411 max_workers=min(max_workers, len(wave)),
412 only_ids=only_ids,
413 dry_run=args.dry_run,
414 overlap_detection=False,
415 serialize_overlapping=True,
416 base_branch=_base_branch,
417 clean_start=True, # Sprint manages its own state; don't load stale orchestrator state
418 )
420 from little_loops.events import EventBus
421 from little_loops.extension import wire_extensions
422 from little_loops.transport import wire_transports
424 event_bus = EventBus()
425 wire_extensions(event_bus, config.extensions)
426 wire_transports(event_bus, config.events)
427 orchestrator = ParallelOrchestrator(
428 parallel_config,
429 config,
430 Path.cwd(),
431 wave_label=f"Wave {wave_num}/{total_waves}",
432 event_bus=event_bus,
433 )
434 result = orchestrator.run()
435 total_duration += orchestrator.execution_duration
437 # Track completed/failed from this wave using per-issue results
438 actually_completed = set(orchestrator.queue.completed_ids)
439 actually_failed = set(orchestrator.queue.failed_ids)
441 for issue_id in wave_ids:
442 if issue_id in actually_completed:
443 completed.add(issue_id)
444 state.completed_issues.append(issue_id)
445 state.timing[issue_id] = {
446 "total": orchestrator.execution_duration / len(wave)
447 }
448 elif issue_id in actually_failed:
449 completed.add(issue_id)
450 state.failed_issues[issue_id] = "Issue failed during wave execution"
451 # else: issue was neither completed nor failed (interrupted/stranded)
452 # — leave untracked so it can be retried on resume
454 # Sequential retry for failed issues (ENH-308)
455 if actually_failed:
456 logger.info(f"Retrying {len(actually_failed)} failed issue(s) sequentially...")
457 from little_loops.issue_manager import process_issue_inplace
459 retried_ok = 0
460 for issue in wave:
461 if issue.issue_id not in actually_failed:
462 continue
463 logger.info(f" Retrying {issue.issue_id} in-place...")
464 retry_result = process_issue_inplace(
465 info=issue,
466 config=config,
467 logger=logger,
468 dry_run=args.dry_run,
469 )
470 total_duration += retry_result.duration
471 if retry_result.success:
472 retried_ok += 1
473 state.failed_issues.pop(issue.issue_id, None)
474 state.completed_issues.append(issue.issue_id)
475 state.timing[issue.issue_id] = {"total": retry_result.duration}
476 logger.success(f" Retry succeeded: {issue.issue_id}")
477 elif retry_result.was_blocked:
478 state.failed_issues.pop(issue.issue_id, None)
479 state.skipped_blocked_issues[issue.issue_id] = (
480 retry_result.failure_reason
481 )
482 logger.warning(
483 f" Retry skipped: {issue.issue_id} (blocked by open dependency)"
484 )
485 else:
486 logger.warning(f" Retry failed: {issue.issue_id}")
487 if retried_ok > 0:
488 logger.info(
489 f"Sequential retry recovered {retried_ok}/{len(actually_failed)} issue(s)"
490 )
492 # Check whether failures remain after retry (ENH-308)
493 remaining_failures = {iid for iid in actually_failed if iid in state.failed_issues}
494 if result == 0 or not remaining_failures:
495 logger.success(
496 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}"
497 )
498 else:
499 failed_waves += 1
500 logger.warning(f"Wave {wave_num}/{total_waves} had failures")
501 _save_sprint_state(state, logger)
502 if wave_num < total_waves:
503 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...")
504 # Check for shutdown before next wave (ENH-183)
505 if _sprint_shutdown_requested:
506 logger.warning("Shutdown requested - exiting after wave completion")
507 exit_code = 1
508 return exit_code
510 wave_word = "wave" if len(waves) == 1 else "waves"
511 skip_msg = (
512 f", {len(pre_completed_skipped)} already completed (skipped)"
513 if pre_completed_skipped
514 else ""
515 )
516 blocked_msg = (
517 f", {len(state.skipped_blocked_issues)} skipped (blocked)"
518 if state.skipped_blocked_issues
519 else ""
520 )
521 logger.info(
522 f"\nSprint completed: {len(completed)} issues processed "
523 f"({len(waves)} {wave_word}){skip_msg}{blocked_msg}"
524 )
525 logger.timing(f"Total execution time: {format_duration(total_duration)}")
526 if failed_waves > 0:
527 logger.warning(f"{failed_waves} wave(s) had failures")
528 exit_code = 1
529 else:
530 # Clean up state on successful completion
531 _cleanup_sprint_state(logger)
533 except KeyboardInterrupt:
534 # Belt-and-suspenders with signal handler (ENH-185)
535 logger.warning("Sprint interrupted by user (KeyboardInterrupt)")
536 exit_code = 130
538 except Exception as e:
539 # Catch unexpected exceptions (ENH-185)
540 logger.error(f"Sprint failed unexpectedly: {e}")
541 exit_code = 1
543 finally:
544 # Guaranteed state save on any non-success exit (ENH-185)
545 if exit_code != 0:
546 _save_sprint_state(state, logger)
547 logger.info("State saved before exit")
549 _sprint_logger = None # Clear for test isolation
550 return exit_code