Coverage for little_loops / cli / sprint / run.py: 86%

318 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-sprint run subcommand with signal handling and state management.""" 

2 

3from __future__ import annotations 

4 

5import os 

6import signal 

7import subprocess 

8import sys 

9from pathlib import Path 

10from types import FrameType 

11from typing import TYPE_CHECKING 

12 

13from little_loops.cli.output import use_color_enabled 

14from little_loops.cli.sprint._helpers import _build_issue_contents, _render_dependency_analysis 

15from little_loops.cli_args import _id_matches, parse_issue_ids, parse_issue_types, parse_labels 

16from little_loops.dependency_graph import DependencyGraph, refine_waves_for_contention 

17from little_loops.logger import Logger, format_duration 

18from little_loops.parallel.orchestrator import ParallelOrchestrator 

19from little_loops.sprint import SprintManager, SprintState 

20 

21if TYPE_CHECKING: 

22 import argparse 

23 

24 from little_loops.config import BRConfig 

25 

26# Module-level shutdown flag for ll-sprint signal handling (ENH-183) 

27_sprint_shutdown_requested: bool = False 

28_sprint_logger: Logger | None = None 

29 

30 

31def _sprint_signal_handler(signum: int, frame: FrameType | None) -> None: 

32 """Handle shutdown signals gracefully for ll-sprint. 

33 

34 First signal: Set shutdown flag for graceful exit after current wave. 

35 Second signal: Force immediate exit. 

36 """ 

37 global _sprint_shutdown_requested 

38 if _sprint_shutdown_requested: 

39 # Second signal - force exit 

40 msg = "\nForce shutdown requested" 

41 if _sprint_logger is not None: 

42 _sprint_logger.warning(msg) 

43 else: 

44 print(msg, file=sys.stderr) 

45 sys.exit(1) 

46 _sprint_shutdown_requested = True 

47 msg = "\nShutdown requested, will exit after current wave..." 

48 if _sprint_logger is not None: 

49 _sprint_logger.warning(msg) 

50 else: 

51 print(msg, file=sys.stderr) 

52 

53 

54def _get_sprint_state_file() -> Path: 

55 """Get path to sprint state file.""" 

56 return Path.cwd() / ".sprint-state.json" 

57 

58 

59def _load_sprint_state(logger: Logger) -> SprintState | None: 

60 """Load sprint state from file.""" 

61 import json 

62 

63 state_file = _get_sprint_state_file() 

64 if not state_file.exists(): 

65 return None 

66 try: 

67 data = json.loads(state_file.read_text()) 

68 state = SprintState.from_dict(data) 

69 logger.info(f"State loaded from {state_file}") 

70 return state 

71 except (json.JSONDecodeError, KeyError) as e: 

72 logger.warning(f"Failed to load state: {e}") 

73 return None 

74 

75 

76def _save_sprint_state(state: SprintState, logger: Logger) -> None: 

77 """Save sprint state to file.""" 

78 import json 

79 from datetime import datetime 

80 

81 state.last_checkpoint = datetime.now().isoformat() 

82 state_file = _get_sprint_state_file() 

83 state_file.write_text(json.dumps(state.to_dict(), indent=2)) 

84 logger.info(f"State saved to {state_file}") 

85 

86 

87def _cleanup_sprint_state(logger: Logger) -> None: 

88 """Remove sprint state file.""" 

89 state_file = _get_sprint_state_file() 

90 if state_file.exists(): 

91 state_file.unlink() 

92 logger.info("Sprint state file cleaned up") 

93 

94 

95def _cmd_sprint_run( 

96 args: argparse.Namespace, 

97 manager: SprintManager, 

98 config: BRConfig, 

99) -> int: 

100 """Execute a sprint with dependency-aware scheduling.""" 

101 from datetime import datetime 

102 

103 global _sprint_logger 

104 logger = Logger(verbose=not args.quiet, use_color=use_color_enabled()) 

105 _sprint_logger = logger 

106 

107 # Setup signal handlers for graceful shutdown (ENH-183) 

108 global _sprint_shutdown_requested 

109 _sprint_shutdown_requested = False # Reset in case of multiple runs 

110 signal.signal(signal.SIGINT, _sprint_signal_handler) 

111 signal.signal(signal.SIGTERM, _sprint_signal_handler) 

112 

113 handoff_threshold = getattr(args, "handoff_threshold", None) 

114 if handoff_threshold is not None: 

115 os.environ["LL_HANDOFF_THRESHOLD"] = str(handoff_threshold) 

116 

117 context_limit = getattr(args, "context_limit", None) 

118 if context_limit is not None: 

119 os.environ["LL_CONTEXT_LIMIT"] = str(context_limit) 

120 

121 sprint = manager.load(args.sprint) 

122 if not sprint: 

123 logger.error(f"Sprint not found: {args.sprint}") 

124 return 1 

125 

126 # Apply skip filter if provided 

127 issues_to_process = list(sprint.issues) 

128 skip_ids = parse_issue_ids(args.skip) 

129 if skip_ids: 

130 original_count = len(issues_to_process) 

131 issues_to_process = [i for i in issues_to_process if i not in skip_ids] 

132 skipped = original_count - len(issues_to_process) 

133 if skipped > 0: 

134 logger.info(f"Skipping {skipped} issue(s): {', '.join(sorted(skip_ids))}") 

135 

136 # Apply only filter if provided 

137 only_ids = parse_issue_ids(getattr(args, "only", None)) 

138 if only_ids: 

139 invalid_only = {p for p in only_ids if not any(_id_matches(i, p) for i in sprint.issues)} 

140 if invalid_only: 

141 logger.error( 

142 f"Issue(s) not found in sprint definition: {', '.join(sorted(invalid_only))}" 

143 ) 

144 return 1 

145 issues_to_process = [ 

146 i for i in issues_to_process if any(_id_matches(i, p) for p in only_ids) 

147 ] 

148 logger.info( 

149 f"Processing only {len(issues_to_process)} issue(s): {', '.join(sorted(only_ids))}" 

150 ) 

151 

152 # Apply type filter if provided 

153 type_prefixes = parse_issue_types(getattr(args, "type", None)) 

154 if type_prefixes: 

155 original_count = len(issues_to_process) 

156 issues_to_process = [i for i in issues_to_process if i.split("-", 1)[0] in type_prefixes] 

157 filtered = original_count - len(issues_to_process) 

158 if filtered > 0: 

159 logger.info(f"Filtered {filtered} issue(s) by type: {', '.join(sorted(type_prefixes))}") 

160 

161 # Pre-validate: skip issues with status: done/cancelled (ENH-1424) 

162 pre_completed_skipped: list[str] = [] 

163 

164 # Validate issues exist 

165 valid = manager.validate_issues(issues_to_process) 

166 invalid = set(issues_to_process) - set(valid.keys()) 

167 

168 if invalid: 

169 logger.error(f"Issue IDs not found: {', '.join(sorted(invalid))}") 

170 logger.info("Cannot execute sprint with missing issues") 

171 return 1 

172 

173 # Write milestone: field back to issue files for all valid issues in this sprint 

174 from little_loops.frontmatter import parse_frontmatter, update_frontmatter 

175 

176 for _issue_id, path in valid.items(): 

177 content = path.read_text(encoding="utf-8") 

178 updated = update_frontmatter(content, {"milestone": sprint.name}) 

179 if updated != content: 

180 path.write_text(updated, encoding="utf-8") 

181 logger.info(f"Set milestone: {sprint.name!r} on {len(valid)} issue file(s)") 

182 

183 # Skip issues already completed via frontmatter status (ENH-1424) 

184 still_active: list[str] = [] 

185 for issue_id in issues_to_process: 

186 path = valid[issue_id] 

187 fm = parse_frontmatter(path.read_text(encoding="utf-8")) 

188 if fm.get("status", "open") in ("done", "cancelled"): 

189 logger.info(f" {issue_id}: already completed (status: {fm.get('status')}), skipping") 

190 pre_completed_skipped.append(issue_id) 

191 else: 

192 still_active.append(issue_id) 

193 if pre_completed_skipped: 

194 logger.info( 

195 f"Pre-validation: {len(pre_completed_skipped)} issue(s) already completed, " 

196 f"{len(still_active)} active" 

197 ) 

198 issues_to_process = still_active 

199 

200 if pre_completed_skipped and not issues_to_process: 

201 logger.info("All sprint issues already completed - nothing to process") 

202 return 0 

203 

204 # Load full IssueInfo objects for dependency analysis 

205 issue_infos = manager.load_issue_infos(issues_to_process) 

206 if not issue_infos: 

207 logger.error("No issue files found") 

208 return 1 

209 

210 # Apply label filter if provided (must run after IssueInfo load since labels come from frontmatter) 

211 label_filter = parse_labels(getattr(args, "label", None)) 

212 if label_filter: 

213 original_count = len(issue_infos) 

214 issue_infos = [i for i in issue_infos if any(lb.lower() in label_filter for lb in i.labels)] 

215 filtered = original_count - len(issue_infos) 

216 if filtered > 0: 

217 logger.info(f"Filtered {filtered} issue(s) by label: {', '.join(sorted(label_filter))}") 

218 issues_to_process = [i.issue_id for i in issue_infos] 

219 

220 # Gather all issue IDs on disk to avoid false "nonexistent" warnings 

221 from little_loops.dependency_mapper import gather_all_issue_ids 

222 

223 issues_dir = config.project_root / config.issues.base_dir 

224 all_known_ids = gather_all_issue_ids(issues_dir, config=config) 

225 

226 # Dependency analysis (ENH-301) 

227 if not getattr(args, "skip_analysis", False): 

228 from little_loops.dependency_mapper import analyze_dependencies 

229 

230 issue_contents = _build_issue_contents(issue_infos) 

231 dep_config = config.dependency_mapping 

232 dep_report = analyze_dependencies( 

233 issue_infos, issue_contents, all_known_ids=all_known_ids, config=dep_config 

234 ) 

235 _render_dependency_analysis(dep_report, logger, config=dep_config) 

236 

237 # Build dependency graph 

238 dep_graph = DependencyGraph.from_issues(issue_infos, all_known_ids=all_known_ids) 

239 

240 # Detect cycles 

241 if dep_graph.has_cycles(): 

242 cycles = dep_graph.detect_cycles() 

243 for cycle in cycles: 

244 logger.error(f"Dependency cycle detected: {' -> '.join(cycle)}") 

245 return 1 

246 

247 # Get execution waves 

248 try: 

249 waves = dep_graph.get_execution_waves() 

250 except ValueError as e: 

251 logger.error(str(e)) 

252 return 1 

253 

254 # Refine waves for file overlap (ENH-306) 

255 waves, contention_notes = refine_waves_for_contention(waves, config=config.dependency_mapping) 

256 

257 # Display execution plan 

258 logger.info(f"Running sprint: {sprint.name}") 

259 logger.info("Dependency analysis:") 

260 for i, wave in enumerate(waves, 1): 

261 issue_ids = ", ".join(issue.issue_id for issue in wave) 

262 note = contention_notes[i - 1] if contention_notes else None 

263 if note: 

264 logger.info( 

265 f" Wave {i}: {issue_ids}" 

266 f" [sub-wave {note.sub_wave_index + 1}/{note.total_sub_waves}]" 

267 ) 

268 else: 

269 logger.info(f" Wave {i}: {issue_ids}") 

270 

271 if args.dry_run: 

272 logger.info("\nDry run mode - no changes will be made") 

273 return 0 

274 

275 # Initialize or load state 

276 state: SprintState 

277 start_wave = 1 

278 

279 if args.resume: 

280 loaded_state = _load_sprint_state(logger) 

281 if loaded_state and loaded_state.sprint_name == args.sprint: 

282 state = loaded_state 

283 # Find first incomplete wave by checking completed issues 

284 completed_set = set(state.completed_issues) 

285 for i, wave in enumerate(waves, 1): 

286 wave_issue_ids = {issue.issue_id for issue in wave} 

287 if not wave_issue_ids.issubset(completed_set): 

288 start_wave = i 

289 break 

290 else: 

291 # All waves completed 

292 logger.info("Sprint already completed - nothing to resume") 

293 _cleanup_sprint_state(logger) 

294 return 0 

295 logger.info(f"Resuming from wave {start_wave}/{len(waves)}") 

296 logger.info(f" Previously completed: {len(state.completed_issues)} issues") 

297 else: 

298 if loaded_state: 

299 logger.warning( 

300 f"State file is for sprint '{loaded_state.sprint_name}', " 

301 f"not '{args.sprint}' - starting fresh" 

302 ) 

303 else: 

304 logger.warning("No valid state found - starting fresh") 

305 state = SprintState( 

306 sprint_name=args.sprint, 

307 started_at=datetime.now().isoformat(), 

308 ) 

309 else: 

310 # Fresh start - delete any old state 

311 _cleanup_sprint_state(logger) 

312 state = SprintState( 

313 sprint_name=args.sprint, 

314 started_at=datetime.now().isoformat(), 

315 ) 

316 

317 # Track exit status for error handling (ENH-185) 

318 exit_code = 0 

319 

320 try: 

321 # Determine max workers 

322 max_workers = args.max_workers or (sprint.options.max_workers if sprint.options else 2) 

323 

324 # Execute wave by wave 

325 completed: set[str] = set(state.completed_issues) 

326 failed_waves = 0 

327 total_duration = 0.0 

328 total_waves = len(waves) 

329 

330 for wave_num, wave in enumerate(waves, 1): 

331 # Check for shutdown request (ENH-183) 

332 if _sprint_shutdown_requested: 

333 logger.warning("Shutdown requested - saving state and exiting") 

334 _save_sprint_state(state, logger) 

335 exit_code = 1 

336 return exit_code 

337 

338 # Skip already-completed waves when resuming 

339 if wave_num < start_wave: 

340 continue 

341 

342 wave_ids = [issue.issue_id for issue in wave] 

343 state.current_wave = wave_num 

344 logger.info(f"\nProcessing wave {wave_num}/{total_waves}: {', '.join(wave_ids)}") 

345 

346 wave_note = ( 

347 contention_notes[wave_num - 1] 

348 if contention_notes and wave_num - 1 < len(contention_notes) 

349 else None 

350 ) 

351 is_contention_subwave = wave_note is not None 

352 

353 if len(wave) == 1 or is_contention_subwave: 

354 # Single issue OR contention sub-wave — process in-place sequentially 

355 # (contention sub-waves are displayed as "serialized steps" so must run that way) 

356 from little_loops.issue_manager import process_issue_inplace 

357 

358 wave_failed = False 

359 for issue in wave: 

360 issue_result = process_issue_inplace( 

361 info=issue, 

362 config=config, 

363 logger=logger, 

364 dry_run=args.dry_run, 

365 ) 

366 total_duration += issue_result.duration 

367 if issue_result.success: 

368 completed.add(issue.issue_id) 

369 state.completed_issues.append(issue.issue_id) 

370 state.timing[issue.issue_id] = {"total": issue_result.duration} 

371 logger.success(f" {issue.issue_id}: completed") 

372 elif issue_result.was_blocked: 

373 completed.add(issue.issue_id) 

374 state.skipped_blocked_issues[issue.issue_id] = issue_result.failure_reason 

375 logger.warning(f" {issue.issue_id}: skipped (blocked by open dependency)") 

376 else: 

377 wave_failed = True 

378 completed.add(issue.issue_id) 

379 state.failed_issues[issue.issue_id] = "Issue processing failed" 

380 logger.warning(f" {issue.issue_id}: failed") 

381 if wave_failed: 

382 failed_waves += 1 

383 logger.warning(f"Wave {wave_num}/{total_waves} had failures") 

384 else: 

385 logger.success( 

386 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}" 

387 ) 

388 _save_sprint_state(state, logger) 

389 if wave_num < total_waves: 

390 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...") 

391 # Check for shutdown before next wave (ENH-183) 

392 if _sprint_shutdown_requested: 

393 logger.warning("Shutdown requested - exiting after wave completion") 

394 exit_code = 1 

395 return exit_code 

396 else: 

397 # Multi-issue — use ParallelOrchestrator with worktrees 

398 only_ids = set(wave_ids) 

399 # Detect current branch for rebase/merge operations (BUG-439) 

400 _br = subprocess.run( 

401 ["git", "rev-parse", "--abbrev-ref", "HEAD"], 

402 capture_output=True, 

403 text=True, 

404 cwd=Path.cwd(), 

405 ) 

406 _base_branch = _br.stdout.strip() if _br.returncode == 0 else "main" 

407 # Runtime overlap detection disabled for sprints (ENH-512): 

408 # refine_waves_for_contention() already splits overlapping 

409 # issues into separate sub-waves before dispatch. 

410 parallel_config = config.create_parallel_config( 

411 max_workers=min(max_workers, len(wave)), 

412 only_ids=only_ids, 

413 dry_run=args.dry_run, 

414 overlap_detection=False, 

415 serialize_overlapping=True, 

416 base_branch=_base_branch, 

417 clean_start=True, # Sprint manages its own state; don't load stale orchestrator state 

418 ) 

419 

420 from little_loops.events import EventBus 

421 from little_loops.extension import wire_extensions 

422 from little_loops.transport import wire_transports 

423 

424 event_bus = EventBus() 

425 wire_extensions(event_bus, config.extensions) 

426 wire_transports(event_bus, config.events) 

427 orchestrator = ParallelOrchestrator( 

428 parallel_config, 

429 config, 

430 Path.cwd(), 

431 wave_label=f"Wave {wave_num}/{total_waves}", 

432 event_bus=event_bus, 

433 ) 

434 result = orchestrator.run() 

435 total_duration += orchestrator.execution_duration 

436 

437 # Track completed/failed from this wave using per-issue results 

438 actually_completed = set(orchestrator.queue.completed_ids) 

439 actually_failed = set(orchestrator.queue.failed_ids) 

440 

441 for issue_id in wave_ids: 

442 if issue_id in actually_completed: 

443 completed.add(issue_id) 

444 state.completed_issues.append(issue_id) 

445 state.timing[issue_id] = { 

446 "total": orchestrator.execution_duration / len(wave) 

447 } 

448 elif issue_id in actually_failed: 

449 completed.add(issue_id) 

450 state.failed_issues[issue_id] = "Issue failed during wave execution" 

451 # else: issue was neither completed nor failed (interrupted/stranded) 

452 # — leave untracked so it can be retried on resume 

453 

454 # Sequential retry for failed issues (ENH-308) 

455 if actually_failed: 

456 logger.info(f"Retrying {len(actually_failed)} failed issue(s) sequentially...") 

457 from little_loops.issue_manager import process_issue_inplace 

458 

459 retried_ok = 0 

460 for issue in wave: 

461 if issue.issue_id not in actually_failed: 

462 continue 

463 logger.info(f" Retrying {issue.issue_id} in-place...") 

464 retry_result = process_issue_inplace( 

465 info=issue, 

466 config=config, 

467 logger=logger, 

468 dry_run=args.dry_run, 

469 ) 

470 total_duration += retry_result.duration 

471 if retry_result.success: 

472 retried_ok += 1 

473 state.failed_issues.pop(issue.issue_id, None) 

474 state.completed_issues.append(issue.issue_id) 

475 state.timing[issue.issue_id] = {"total": retry_result.duration} 

476 logger.success(f" Retry succeeded: {issue.issue_id}") 

477 elif retry_result.was_blocked: 

478 state.failed_issues.pop(issue.issue_id, None) 

479 state.skipped_blocked_issues[issue.issue_id] = ( 

480 retry_result.failure_reason 

481 ) 

482 logger.warning( 

483 f" Retry skipped: {issue.issue_id} (blocked by open dependency)" 

484 ) 

485 else: 

486 logger.warning(f" Retry failed: {issue.issue_id}") 

487 if retried_ok > 0: 

488 logger.info( 

489 f"Sequential retry recovered {retried_ok}/{len(actually_failed)} issue(s)" 

490 ) 

491 

492 # Check whether failures remain after retry (ENH-308) 

493 remaining_failures = {iid for iid in actually_failed if iid in state.failed_issues} 

494 if result == 0 or not remaining_failures: 

495 logger.success( 

496 f"Wave {wave_num}/{total_waves} completed: {', '.join(wave_ids)}" 

497 ) 

498 else: 

499 failed_waves += 1 

500 logger.warning(f"Wave {wave_num}/{total_waves} had failures") 

501 _save_sprint_state(state, logger) 

502 if wave_num < total_waves: 

503 logger.info(f"Continuing to wave {wave_num + 1}/{total_waves}...") 

504 # Check for shutdown before next wave (ENH-183) 

505 if _sprint_shutdown_requested: 

506 logger.warning("Shutdown requested - exiting after wave completion") 

507 exit_code = 1 

508 return exit_code 

509 

510 wave_word = "wave" if len(waves) == 1 else "waves" 

511 skip_msg = ( 

512 f", {len(pre_completed_skipped)} already completed (skipped)" 

513 if pre_completed_skipped 

514 else "" 

515 ) 

516 blocked_msg = ( 

517 f", {len(state.skipped_blocked_issues)} skipped (blocked)" 

518 if state.skipped_blocked_issues 

519 else "" 

520 ) 

521 logger.info( 

522 f"\nSprint completed: {len(completed)} issues processed " 

523 f"({len(waves)} {wave_word}){skip_msg}{blocked_msg}" 

524 ) 

525 logger.timing(f"Total execution time: {format_duration(total_duration)}") 

526 if failed_waves > 0: 

527 logger.warning(f"{failed_waves} wave(s) had failures") 

528 exit_code = 1 

529 else: 

530 # Clean up state on successful completion 

531 _cleanup_sprint_state(logger) 

532 

533 except KeyboardInterrupt: 

534 # Belt-and-suspenders with signal handler (ENH-185) 

535 logger.warning("Sprint interrupted by user (KeyboardInterrupt)") 

536 exit_code = 130 

537 

538 except Exception as e: 

539 # Catch unexpected exceptions (ENH-185) 

540 logger.error(f"Sprint failed unexpectedly: {e}") 

541 exit_code = 1 

542 

543 finally: 

544 # Guaranteed state save on any non-success exit (ENH-185) 

545 if exit_code != 0: 

546 _save_sprint_state(state, logger) 

547 logger.info("State saved before exit") 

548 

549 _sprint_logger = None # Clear for test isolation 

550 return exit_code