Coverage for little_loops / cli / sprint / show.py: 82%
209 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-sprint show subcommand and dependency visualization renderers."""
3from __future__ import annotations
5import argparse
6from pathlib import Path
7from typing import TYPE_CHECKING, Any
9from little_loops.cli.output import colorize, format_relative_time, print_json, terminal_width
10from little_loops.cli.sprint._helpers import (
11 _build_issue_contents,
12 _render_dependency_analysis,
13 _render_execution_plan,
14)
15from little_loops.dependency_graph import DependencyGraph, refine_waves_for_contention
16from little_loops.logger import Logger
18if TYPE_CHECKING:
19 from little_loops.dependency_graph import WaveContentionNote
20 from little_loops.sprint import SprintManager
23def _render_dependency_graph(
24 waves: list[list[Any]],
25 dep_graph: DependencyGraph,
26) -> str:
27 """Render ASCII dependency graph.
29 Args:
30 waves: List of execution waves
31 dep_graph: DependencyGraph for looking up relationships
33 Returns:
34 Formatted string showing dependency arrows
35 """
36 if not waves or len(waves) <= 1:
37 return ""
39 # Don't render graph if there are no actual dependency edges
40 # (waves > 1 can happen from file overlap splitting alone)
41 all_ids = {issue.issue_id for wave in waves for issue in wave}
42 has_edges = any(dep_graph.blocks.get(issue_id, set()) & all_ids for issue_id in all_ids)
43 if not has_edges:
44 return ""
46 width = terminal_width()
47 lines: list[str] = []
48 header_text = "Dependency Graph"
49 fill = "\u2500" * max(0, width - len(header_text) - 4)
50 lines.append("")
51 lines.append(f"\u2500\u2500 {header_text} {fill}")
52 lines.append("")
54 # Build chains: track which issues block what
55 # Show each independent chain on its own line
56 chains: list[str] = []
57 visited: set[str] = set()
59 def build_chain(issue_id: str) -> str:
60 """Recursively build chain string from issue."""
61 if issue_id in visited:
62 return issue_id
63 visited.add(issue_id)
65 blocked_issues = sorted(dep_graph.blocks.get(issue_id, set()))
66 if not blocked_issues:
67 return issue_id
69 if len(blocked_issues) == 1:
70 return f"{issue_id} \u2500\u2500\u2192 {build_chain(blocked_issues[0])}"
71 else:
72 # Multiple branches - show first inline, note others
73 result = f"{issue_id} \u2500\u2500\u2192 {build_chain(blocked_issues[0])}"
74 for other in blocked_issues[1:]:
75 if other not in visited:
76 chains.append(f" {issue_id} \u2500\u2500\u2192 {build_chain(other)}")
77 return result
79 # Find root issues structurally (not blocked by anything in this graph)
80 roots = [iid for iid in sorted(all_ids) if not (dep_graph.blocked_by.get(iid, set()) & all_ids)]
82 for root in roots:
83 if root not in visited:
84 chain = build_chain(root)
85 if chain and "──→" in chain:
86 chains.append(f" {chain}")
88 lines.extend(chains)
89 lines.append("")
90 lines.append("Legend: \u2500\u2500\u2192 blocks (must complete before)")
92 return "\n".join(lines)
95def _render_health_summary(
96 waves: list[list[Any]],
97 contention_notes: list[WaveContentionNote | None] | None,
98 has_cycles: bool,
99 invalid: set[str],
100 dep_report: Any | None = None,
101 issue_to_wave: dict[str, int] | None = None,
102) -> str:
103 """Render a one-line sprint health summary.
105 Returns:
106 Health summary string like "OK -- 5 issues in 1 wave, contention serialized"
107 """
108 total_issues = sum(len(w) for w in waves)
110 _STATUS_COLOR = {"OK": "32", "REVIEW": "33", "WARNING": "38;5;208", "BLOCKED": "31"}
112 if has_cycles:
113 return f"{colorize('BLOCKED', _STATUS_COLOR['BLOCKED'])} -- dependency cycles detected"
115 if invalid:
116 return f"{colorize('WARNING', _STATUS_COLOR['WARNING'])} -- {len(invalid)} issue(s) not found on disk"
118 # Check for novel (unsatisfied) high-confidence proposals
119 if dep_report and dep_report.proposals and issue_to_wave is not None:
120 novel_count = 0
121 for p in dep_report.proposals:
122 target_wave = issue_to_wave.get(p.target_id)
123 source_wave = issue_to_wave.get(p.source_id)
124 if target_wave is None or source_wave is None or target_wave >= source_wave:
125 if p.confidence >= 0.5:
126 novel_count += 1
127 if novel_count > 0:
128 return f"{colorize('REVIEW', _STATUS_COLOR['REVIEW'])} -- {novel_count} potential dependency(ies) to review"
130 # Count logical waves (group contention sub-waves)
131 notes = contention_notes or [None] * len(waves)
132 logical_count = 0
133 has_contention = False
134 prev_parent: int | None = None
135 for idx in range(len(waves)):
136 note = notes[idx] if idx < len(notes) else None
137 if note is not None:
138 has_contention = True
139 if prev_parent is None or note.parent_wave_index != prev_parent:
140 logical_count += 1
141 prev_parent = note.parent_wave_index
142 else:
143 logical_count += 1
144 prev_parent = None
146 wave_word = "wave" if logical_count == 1 else "waves"
147 suffix = ", overlap serialized" if has_contention else ", all parallelizable"
148 if logical_count == 1 and total_issues == 1:
149 suffix = ""
151 return f"{colorize('OK', _STATUS_COLOR['OK'])} -- {total_issues} issues in {logical_count} {wave_word}{suffix}"
154def _cmd_sprint_show(args: argparse.Namespace, manager: SprintManager) -> int:
155 """Show sprint details with dependency visualization."""
156 logger = Logger()
157 sprint = manager.load(args.sprint)
158 if not sprint:
159 logger.error(f"Sprint not found: {args.sprint}")
160 return 1
162 # Validate issues
163 valid = manager.validate_issues(sprint.issues)
164 invalid = set(sprint.issues) - set(valid.keys())
166 # Load full IssueInfo objects for dependency analysis
167 issue_infos = manager.load_issue_infos(list(valid.keys()))
168 dep_graph: DependencyGraph | None = None
169 waves: list[list[Any]] = []
170 contention_notes: list[WaveContentionNote | None] | None = None
171 has_cycles = False
173 # Gather all issue IDs on disk to avoid false "nonexistent" warnings
174 from little_loops.dependency_mapper import gather_all_issue_ids
176 config = manager.config
177 issues_dir = config.project_root / config.issues.base_dir if config else Path(".issues")
178 all_known_ids = gather_all_issue_ids(issues_dir, config=config)
180 if issue_infos:
181 dep_graph = DependencyGraph.from_issues(issue_infos, all_known_ids=all_known_ids)
182 has_cycles = dep_graph.has_cycles()
184 if not has_cycles:
185 waves = dep_graph.get_execution_waves()
186 dep_config = config.dependency_mapping if config else None
187 waves, contention_notes = refine_waves_for_contention(waves, config=dep_config)
189 # JSON early-exit
190 if getattr(args, "json", False):
191 return _show_json(sprint, issue_infos, waves, contention_notes, has_cycles, dep_graph)
193 print(f"{colorize('Sprint:', '1')} {sprint.name}")
194 if sprint.description:
195 print(f"Description: {sprint.description}")
196 print(f"Created: {_format_created(sprint.created)}")
198 # Options on a single compact line right after metadata
199 if sprint.options:
200 opts = sprint.options
201 print(
202 f"Options: max_workers={opts.max_workers}, timeout={opts.timeout}s, max_iterations={opts.max_iterations}"
203 )
205 # Sprint run state from .sprint-state.json
206 _print_run_state(sprint.name)
208 # Dependency analysis (ENH-301) - run before health summary so we can reference it
209 dep_report: Any = None
210 issue_to_wave: dict[str, int] = {}
211 if issue_infos and not args.skip_analysis:
212 from little_loops.dependency_mapper import analyze_dependencies
214 issue_contents = _build_issue_contents(issue_infos)
215 dep_report = analyze_dependencies(
216 issue_infos, issue_contents, all_known_ids=all_known_ids, config=dep_config
217 )
219 # Build wave ordering map so we can filter already-satisfied proposals
220 for wave_idx, wave in enumerate(waves):
221 for issue in wave:
222 issue_to_wave[issue.issue_id] = wave_idx
224 # Sprint health summary
225 if waves:
226 health = _render_health_summary(
227 waves,
228 contention_notes,
229 has_cycles,
230 invalid,
231 dep_report=dep_report,
232 issue_to_wave=issue_to_wave if issue_to_wave else None,
233 )
234 print(f"Sprint health: {health}")
236 # Composition breakdown
237 _print_composition(issue_infos)
239 # Show execution plan if we have dependency info and no cycles
240 if waves and dep_graph:
241 print(_render_execution_plan(waves, dep_graph, contention_notes, config=dep_config))
242 print(_render_dependency_graph(waves, dep_graph))
243 else:
244 # Fallback to simple list if no valid issues or cycles
245 print(f"Issues ({len(sprint.issues)}):")
246 for issue_id in sprint.issues:
247 status = "valid" if issue_id in valid else "NOT FOUND"
248 print(f" - {issue_id} ({status})")
250 # Warn about cycles if detected
251 if has_cycles and dep_graph:
252 cycles = dep_graph.detect_cycles()
253 print("\nWarning: Dependency cycles detected:")
254 for cycle in cycles:
255 print(f" {' -> '.join(cycle)}")
257 # Render dependency analysis output
258 if dep_report is not None:
259 _render_dependency_analysis(
260 dep_report,
261 logger,
262 issue_to_wave=issue_to_wave if issue_to_wave else None,
263 config=dep_config,
264 )
266 if invalid:
267 print(f"\nWarning: {len(invalid)} issue(s) not found")
269 return 0
272# ---------------------------------------------------------------------------
273# Helper functions for enhanced show output (ENH-923)
274# ---------------------------------------------------------------------------
277def _format_created(iso_str: str) -> str:
278 """Format an ISO 8601 created timestamp as a human-friendly string."""
279 import time
280 from datetime import UTC, datetime
282 try:
283 dt = datetime.fromisoformat(iso_str)
284 if dt.tzinfo is None:
285 dt = dt.replace(tzinfo=UTC)
286 formatted = dt.strftime("%Y-%m-%d %H:%M UTC")
287 elapsed = time.time() - dt.timestamp()
288 if elapsed >= 0:
289 return f"{formatted} ({format_relative_time(elapsed)})"
290 return formatted
291 except (ValueError, OSError):
292 return iso_str
295def _print_composition(issue_infos: list[Any]) -> None:
296 """Print type/priority composition breakdown."""
297 if not issue_infos:
298 return
299 from collections import Counter
301 types: Counter[str] = Counter()
302 priorities: Counter[str] = Counter()
303 for info in issue_infos:
304 issue_type = info.issue_id.split("-", 1)[0]
305 types[issue_type] += 1
306 if info.priority:
307 priorities[info.priority] += 1
309 type_parts = [f"{count} {t}" for t, count in sorted(types.items())]
310 prio_parts = [f"{p}: {count}" for p, count in sorted(priorities.items())]
311 print(f"Composition: {', '.join(type_parts)} | {', '.join(prio_parts)}")
314def _print_run_state(sprint_name: str) -> None:
315 """Print last run state if .sprint-state.json exists for this sprint."""
316 import json
318 state_file = Path.cwd() / ".sprint-state.json"
319 if not state_file.exists():
320 return
321 try:
322 data = json.loads(state_file.read_text())
323 if data.get("sprint_name") != sprint_name:
324 return
325 completed = data.get("completed_issues", [])
326 failed = data.get("failed_issues", {})
327 skipped = data.get("skipped_blocked_issues", {})
328 total = len(completed) + len(failed) + len(skipped)
330 started = data.get("started_at", "")
331 date_str = started[:10] if started else "unknown"
333 parts = [f"{len(completed)} completed"]
334 if failed:
335 failed_ids = ", ".join(sorted(failed.keys()))
336 parts.append(f"{len(failed)} failed ({failed_ids})")
337 if skipped:
338 parts.append(f"{len(skipped)} skipped")
339 print(f"Last run: {date_str} \u2014 {', '.join(parts)} of {total}")
340 except (json.JSONDecodeError, OSError):
341 pass
344def _show_json(
345 sprint: Any,
346 issue_infos: list[Any],
347 waves: list[list[Any]],
348 contention_notes: Any,
349 has_cycles: bool,
350 dep_graph: Any,
351) -> int:
352 """Render sprint show output as JSON."""
353 issues_data = []
354 for info in issue_infos:
355 issues_data.append(
356 {
357 "id": info.issue_id,
358 "title": info.title,
359 "priority": info.priority,
360 "path": str(info.path),
361 "confidence_score": info.confidence_score,
362 "outcome_confidence": info.outcome_confidence,
363 }
364 )
366 waves_data = []
367 for wave_idx, wave in enumerate(waves):
368 waves_data.append(
369 {
370 "wave": wave_idx + 1,
371 "issues": [issue.issue_id for issue in wave],
372 }
373 )
375 data = {
376 "name": sprint.name,
377 "description": sprint.description or None,
378 "created": sprint.created,
379 "issues": issues_data,
380 "waves": waves_data,
381 "has_cycles": has_cycles,
382 }
383 print_json(data)
384 return 0