Coverage for little_loops / cli / sprint / show.py: 82%

209 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-sprint show subcommand and dependency visualization renderers.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6from pathlib import Path 

7from typing import TYPE_CHECKING, Any 

8 

9from little_loops.cli.output import colorize, format_relative_time, print_json, terminal_width 

10from little_loops.cli.sprint._helpers import ( 

11 _build_issue_contents, 

12 _render_dependency_analysis, 

13 _render_execution_plan, 

14) 

15from little_loops.dependency_graph import DependencyGraph, refine_waves_for_contention 

16from little_loops.logger import Logger 

17 

18if TYPE_CHECKING: 

19 from little_loops.dependency_graph import WaveContentionNote 

20 from little_loops.sprint import SprintManager 

21 

22 

23def _render_dependency_graph( 

24 waves: list[list[Any]], 

25 dep_graph: DependencyGraph, 

26) -> str: 

27 """Render ASCII dependency graph. 

28 

29 Args: 

30 waves: List of execution waves 

31 dep_graph: DependencyGraph for looking up relationships 

32 

33 Returns: 

34 Formatted string showing dependency arrows 

35 """ 

36 if not waves or len(waves) <= 1: 

37 return "" 

38 

39 # Don't render graph if there are no actual dependency edges 

40 # (waves > 1 can happen from file overlap splitting alone) 

41 all_ids = {issue.issue_id for wave in waves for issue in wave} 

42 has_edges = any(dep_graph.blocks.get(issue_id, set()) & all_ids for issue_id in all_ids) 

43 if not has_edges: 

44 return "" 

45 

46 width = terminal_width() 

47 lines: list[str] = [] 

48 header_text = "Dependency Graph" 

49 fill = "\u2500" * max(0, width - len(header_text) - 4) 

50 lines.append("") 

51 lines.append(f"\u2500\u2500 {header_text} {fill}") 

52 lines.append("") 

53 

54 # Build chains: track which issues block what 

55 # Show each independent chain on its own line 

56 chains: list[str] = [] 

57 visited: set[str] = set() 

58 

59 def build_chain(issue_id: str) -> str: 

60 """Recursively build chain string from issue.""" 

61 if issue_id in visited: 

62 return issue_id 

63 visited.add(issue_id) 

64 

65 blocked_issues = sorted(dep_graph.blocks.get(issue_id, set())) 

66 if not blocked_issues: 

67 return issue_id 

68 

69 if len(blocked_issues) == 1: 

70 return f"{issue_id} \u2500\u2500\u2192 {build_chain(blocked_issues[0])}" 

71 else: 

72 # Multiple branches - show first inline, note others 

73 result = f"{issue_id} \u2500\u2500\u2192 {build_chain(blocked_issues[0])}" 

74 for other in blocked_issues[1:]: 

75 if other not in visited: 

76 chains.append(f" {issue_id} \u2500\u2500\u2192 {build_chain(other)}") 

77 return result 

78 

79 # Find root issues structurally (not blocked by anything in this graph) 

80 roots = [iid for iid in sorted(all_ids) if not (dep_graph.blocked_by.get(iid, set()) & all_ids)] 

81 

82 for root in roots: 

83 if root not in visited: 

84 chain = build_chain(root) 

85 if chain and "──→" in chain: 

86 chains.append(f" {chain}") 

87 

88 lines.extend(chains) 

89 lines.append("") 

90 lines.append("Legend: \u2500\u2500\u2192 blocks (must complete before)") 

91 

92 return "\n".join(lines) 

93 

94 

95def _render_health_summary( 

96 waves: list[list[Any]], 

97 contention_notes: list[WaveContentionNote | None] | None, 

98 has_cycles: bool, 

99 invalid: set[str], 

100 dep_report: Any | None = None, 

101 issue_to_wave: dict[str, int] | None = None, 

102) -> str: 

103 """Render a one-line sprint health summary. 

104 

105 Returns: 

106 Health summary string like "OK -- 5 issues in 1 wave, contention serialized" 

107 """ 

108 total_issues = sum(len(w) for w in waves) 

109 

110 _STATUS_COLOR = {"OK": "32", "REVIEW": "33", "WARNING": "38;5;208", "BLOCKED": "31"} 

111 

112 if has_cycles: 

113 return f"{colorize('BLOCKED', _STATUS_COLOR['BLOCKED'])} -- dependency cycles detected" 

114 

115 if invalid: 

116 return f"{colorize('WARNING', _STATUS_COLOR['WARNING'])} -- {len(invalid)} issue(s) not found on disk" 

117 

118 # Check for novel (unsatisfied) high-confidence proposals 

119 if dep_report and dep_report.proposals and issue_to_wave is not None: 

120 novel_count = 0 

121 for p in dep_report.proposals: 

122 target_wave = issue_to_wave.get(p.target_id) 

123 source_wave = issue_to_wave.get(p.source_id) 

124 if target_wave is None or source_wave is None or target_wave >= source_wave: 

125 if p.confidence >= 0.5: 

126 novel_count += 1 

127 if novel_count > 0: 

128 return f"{colorize('REVIEW', _STATUS_COLOR['REVIEW'])} -- {novel_count} potential dependency(ies) to review" 

129 

130 # Count logical waves (group contention sub-waves) 

131 notes = contention_notes or [None] * len(waves) 

132 logical_count = 0 

133 has_contention = False 

134 prev_parent: int | None = None 

135 for idx in range(len(waves)): 

136 note = notes[idx] if idx < len(notes) else None 

137 if note is not None: 

138 has_contention = True 

139 if prev_parent is None or note.parent_wave_index != prev_parent: 

140 logical_count += 1 

141 prev_parent = note.parent_wave_index 

142 else: 

143 logical_count += 1 

144 prev_parent = None 

145 

146 wave_word = "wave" if logical_count == 1 else "waves" 

147 suffix = ", overlap serialized" if has_contention else ", all parallelizable" 

148 if logical_count == 1 and total_issues == 1: 

149 suffix = "" 

150 

151 return f"{colorize('OK', _STATUS_COLOR['OK'])} -- {total_issues} issues in {logical_count} {wave_word}{suffix}" 

152 

153 

154def _cmd_sprint_show(args: argparse.Namespace, manager: SprintManager) -> int: 

155 """Show sprint details with dependency visualization.""" 

156 logger = Logger() 

157 sprint = manager.load(args.sprint) 

158 if not sprint: 

159 logger.error(f"Sprint not found: {args.sprint}") 

160 return 1 

161 

162 # Validate issues 

163 valid = manager.validate_issues(sprint.issues) 

164 invalid = set(sprint.issues) - set(valid.keys()) 

165 

166 # Load full IssueInfo objects for dependency analysis 

167 issue_infos = manager.load_issue_infos(list(valid.keys())) 

168 dep_graph: DependencyGraph | None = None 

169 waves: list[list[Any]] = [] 

170 contention_notes: list[WaveContentionNote | None] | None = None 

171 has_cycles = False 

172 

173 # Gather all issue IDs on disk to avoid false "nonexistent" warnings 

174 from little_loops.dependency_mapper import gather_all_issue_ids 

175 

176 config = manager.config 

177 issues_dir = config.project_root / config.issues.base_dir if config else Path(".issues") 

178 all_known_ids = gather_all_issue_ids(issues_dir, config=config) 

179 

180 if issue_infos: 

181 dep_graph = DependencyGraph.from_issues(issue_infos, all_known_ids=all_known_ids) 

182 has_cycles = dep_graph.has_cycles() 

183 

184 if not has_cycles: 

185 waves = dep_graph.get_execution_waves() 

186 dep_config = config.dependency_mapping if config else None 

187 waves, contention_notes = refine_waves_for_contention(waves, config=dep_config) 

188 

189 # JSON early-exit 

190 if getattr(args, "json", False): 

191 return _show_json(sprint, issue_infos, waves, contention_notes, has_cycles, dep_graph) 

192 

193 print(f"{colorize('Sprint:', '1')} {sprint.name}") 

194 if sprint.description: 

195 print(f"Description: {sprint.description}") 

196 print(f"Created: {_format_created(sprint.created)}") 

197 

198 # Options on a single compact line right after metadata 

199 if sprint.options: 

200 opts = sprint.options 

201 print( 

202 f"Options: max_workers={opts.max_workers}, timeout={opts.timeout}s, max_iterations={opts.max_iterations}" 

203 ) 

204 

205 # Sprint run state from .sprint-state.json 

206 _print_run_state(sprint.name) 

207 

208 # Dependency analysis (ENH-301) - run before health summary so we can reference it 

209 dep_report: Any = None 

210 issue_to_wave: dict[str, int] = {} 

211 if issue_infos and not args.skip_analysis: 

212 from little_loops.dependency_mapper import analyze_dependencies 

213 

214 issue_contents = _build_issue_contents(issue_infos) 

215 dep_report = analyze_dependencies( 

216 issue_infos, issue_contents, all_known_ids=all_known_ids, config=dep_config 

217 ) 

218 

219 # Build wave ordering map so we can filter already-satisfied proposals 

220 for wave_idx, wave in enumerate(waves): 

221 for issue in wave: 

222 issue_to_wave[issue.issue_id] = wave_idx 

223 

224 # Sprint health summary 

225 if waves: 

226 health = _render_health_summary( 

227 waves, 

228 contention_notes, 

229 has_cycles, 

230 invalid, 

231 dep_report=dep_report, 

232 issue_to_wave=issue_to_wave if issue_to_wave else None, 

233 ) 

234 print(f"Sprint health: {health}") 

235 

236 # Composition breakdown 

237 _print_composition(issue_infos) 

238 

239 # Show execution plan if we have dependency info and no cycles 

240 if waves and dep_graph: 

241 print(_render_execution_plan(waves, dep_graph, contention_notes, config=dep_config)) 

242 print(_render_dependency_graph(waves, dep_graph)) 

243 else: 

244 # Fallback to simple list if no valid issues or cycles 

245 print(f"Issues ({len(sprint.issues)}):") 

246 for issue_id in sprint.issues: 

247 status = "valid" if issue_id in valid else "NOT FOUND" 

248 print(f" - {issue_id} ({status})") 

249 

250 # Warn about cycles if detected 

251 if has_cycles and dep_graph: 

252 cycles = dep_graph.detect_cycles() 

253 print("\nWarning: Dependency cycles detected:") 

254 for cycle in cycles: 

255 print(f" {' -> '.join(cycle)}") 

256 

257 # Render dependency analysis output 

258 if dep_report is not None: 

259 _render_dependency_analysis( 

260 dep_report, 

261 logger, 

262 issue_to_wave=issue_to_wave if issue_to_wave else None, 

263 config=dep_config, 

264 ) 

265 

266 if invalid: 

267 print(f"\nWarning: {len(invalid)} issue(s) not found") 

268 

269 return 0 

270 

271 

272# --------------------------------------------------------------------------- 

273# Helper functions for enhanced show output (ENH-923) 

274# --------------------------------------------------------------------------- 

275 

276 

277def _format_created(iso_str: str) -> str: 

278 """Format an ISO 8601 created timestamp as a human-friendly string.""" 

279 import time 

280 from datetime import UTC, datetime 

281 

282 try: 

283 dt = datetime.fromisoformat(iso_str) 

284 if dt.tzinfo is None: 

285 dt = dt.replace(tzinfo=UTC) 

286 formatted = dt.strftime("%Y-%m-%d %H:%M UTC") 

287 elapsed = time.time() - dt.timestamp() 

288 if elapsed >= 0: 

289 return f"{formatted} ({format_relative_time(elapsed)})" 

290 return formatted 

291 except (ValueError, OSError): 

292 return iso_str 

293 

294 

295def _print_composition(issue_infos: list[Any]) -> None: 

296 """Print type/priority composition breakdown.""" 

297 if not issue_infos: 

298 return 

299 from collections import Counter 

300 

301 types: Counter[str] = Counter() 

302 priorities: Counter[str] = Counter() 

303 for info in issue_infos: 

304 issue_type = info.issue_id.split("-", 1)[0] 

305 types[issue_type] += 1 

306 if info.priority: 

307 priorities[info.priority] += 1 

308 

309 type_parts = [f"{count} {t}" for t, count in sorted(types.items())] 

310 prio_parts = [f"{p}: {count}" for p, count in sorted(priorities.items())] 

311 print(f"Composition: {', '.join(type_parts)} | {', '.join(prio_parts)}") 

312 

313 

314def _print_run_state(sprint_name: str) -> None: 

315 """Print last run state if .sprint-state.json exists for this sprint.""" 

316 import json 

317 

318 state_file = Path.cwd() / ".sprint-state.json" 

319 if not state_file.exists(): 

320 return 

321 try: 

322 data = json.loads(state_file.read_text()) 

323 if data.get("sprint_name") != sprint_name: 

324 return 

325 completed = data.get("completed_issues", []) 

326 failed = data.get("failed_issues", {}) 

327 skipped = data.get("skipped_blocked_issues", {}) 

328 total = len(completed) + len(failed) + len(skipped) 

329 

330 started = data.get("started_at", "") 

331 date_str = started[:10] if started else "unknown" 

332 

333 parts = [f"{len(completed)} completed"] 

334 if failed: 

335 failed_ids = ", ".join(sorted(failed.keys())) 

336 parts.append(f"{len(failed)} failed ({failed_ids})") 

337 if skipped: 

338 parts.append(f"{len(skipped)} skipped") 

339 print(f"Last run: {date_str} \u2014 {', '.join(parts)} of {total}") 

340 except (json.JSONDecodeError, OSError): 

341 pass 

342 

343 

344def _show_json( 

345 sprint: Any, 

346 issue_infos: list[Any], 

347 waves: list[list[Any]], 

348 contention_notes: Any, 

349 has_cycles: bool, 

350 dep_graph: Any, 

351) -> int: 

352 """Render sprint show output as JSON.""" 

353 issues_data = [] 

354 for info in issue_infos: 

355 issues_data.append( 

356 { 

357 "id": info.issue_id, 

358 "title": info.title, 

359 "priority": info.priority, 

360 "path": str(info.path), 

361 "confidence_score": info.confidence_score, 

362 "outcome_confidence": info.outcome_confidence, 

363 } 

364 ) 

365 

366 waves_data = [] 

367 for wave_idx, wave in enumerate(waves): 

368 waves_data.append( 

369 { 

370 "wave": wave_idx + 1, 

371 "issues": [issue.issue_id for issue in wave], 

372 } 

373 ) 

374 

375 data = { 

376 "name": sprint.name, 

377 "description": sprint.description or None, 

378 "created": sprint.created, 

379 "issues": issues_data, 

380 "waves": waves_data, 

381 "has_cycles": has_cycles, 

382 } 

383 print_json(data) 

384 return 0