Coverage for little_loops / cli / issues / clusters.py: 96%
163 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-issues clusters: Render issue relationship clusters as box diagrams."""
3from __future__ import annotations
5import argparse
6from collections import deque
7from typing import TYPE_CHECKING
9from little_loops.cli.output import colorize, print_json
11if TYPE_CHECKING:
12 from little_loops.config import BRConfig
13 from little_loops.dependency_graph import DependencyGraph
14 from little_loops.issue_parser import IssueInfo
16# ANSI color codes per relationship type
17EDGE_COLOR: dict[str, str] = {
18 "blocks": "31", # red
19 "blocked_by": "33", # yellow
20 "parent": "34", # blue
21 "sibling": "36", # cyan
22}
24_BOX_HEIGHT = 4 # top border + 2 content lines + bottom border
25_GAP_HEIGHT = 2 # rows between boxes for arrow drawing
26_BOX_MARGIN = 2 # left-margin column offset
27_MAX_BOX_WIDTH = 60
30def _get_connected_components(graph: DependencyGraph, all_ids: set[str]) -> list[list[str]]:
31 """BFS over undirected dependency graph to find connected components.
33 Returns components sorted by size descending.
34 """
35 visited: set[str] = set()
36 components: list[list[str]] = []
38 for node in sorted(all_ids):
39 if node in visited:
40 continue
41 component: list[str] = []
42 queue: deque[str] = deque([node])
43 while queue:
44 current = queue.popleft()
45 if current in visited:
46 continue
47 visited.add(current)
48 component.append(current)
49 neighbors = (
50 graph.blocked_by.get(current, set()) | graph.blocks.get(current, set())
51 ) & all_ids
52 for neighbor in sorted(neighbors):
53 if neighbor not in visited:
54 queue.append(neighbor)
55 components.append(component)
57 return sorted(components, key=len, reverse=True)
60def _topo_sort_cluster(
61 cluster_ids: list[str],
62 blocked_by: dict[str, set[str]],
63) -> tuple[list[str], bool]:
64 """Topological sort (Kahn's) scoped to this cluster.
66 Returns (sorted_ids, has_cycle). Nodes in cycles are appended in sorted
67 order after the acyclic prefix so the caller always gets a full list.
68 """
69 cluster_set = set(cluster_ids)
70 in_degree: dict[str, int] = dict.fromkeys(cluster_ids, 0)
71 adj: dict[str, list[str]] = {id_: [] for id_ in cluster_ids}
73 for id_ in cluster_ids:
74 for dep in sorted(blocked_by.get(id_, set()) & cluster_set):
75 in_degree[id_] += 1
76 adj[dep].append(id_)
78 queue: deque[str] = deque(sorted(id_ for id_, deg in in_degree.items() if deg == 0))
79 result: list[str] = []
81 while queue:
82 node = queue.popleft()
83 result.append(node)
84 for dep in sorted(adj[node]):
85 in_degree[dep] -= 1
86 if in_degree[dep] == 0:
87 queue.append(dep)
89 has_cycle = len(result) < len(cluster_ids)
90 if has_cycle:
91 remaining = sorted(id_ for id_ in cluster_ids if id_ not in set(result))
92 result.extend(remaining)
94 return result, has_cycle
97def _cluster_edges(cluster_ids: set[str], graph: DependencyGraph) -> list[tuple[str, str, str]]:
98 """Return directed edges within a cluster as (from_id, to_id, relationship)."""
99 edges: list[tuple[str, str, str]] = []
100 for id_ in sorted(cluster_ids):
101 for blocked_id in sorted(graph.blocks.get(id_, set())):
102 if blocked_id in cluster_ids:
103 edges.append((id_, blocked_id, "blocks"))
104 return edges
107def _render_cluster_diagram(
108 ordered_ids: list[str],
109 issues_map: dict[str, IssueInfo],
110 edge_map: dict[tuple[str, str], str],
111 box_width: int,
112) -> list[str]:
113 """Render a cluster as a vertical stack of box diagrams with arrows.
115 Uses _draw_box from cli/loop/layout.py as the box primitive.
116 Arrows are drawn between consecutive nodes; edge labels are appended
117 after the arrow row so ANSI escapes don't corrupt the character grid.
118 """
119 # Intentional cross-module import of a private primitive; _draw_box is
120 # reusable and has no FSM-specific logic when is_highlighted=False.
121 from little_loops.cli.loop.layout import _draw_box
123 n = len(ordered_ids)
124 grid_h = n * _BOX_HEIGHT + max(0, n - 1) * _GAP_HEIGHT
125 grid_w = box_width + _BOX_MARGIN * 2 + 2
126 grid: list[list[str]] = [[" "] * grid_w for _ in range(grid_h)]
128 center_col = _BOX_MARGIN + box_width // 2
129 box_start: dict[str, int] = {}
130 avail = box_width - 4 # interior width minus side margins
132 for i, issue_id in enumerate(ordered_ids):
133 row = i * (_BOX_HEIGHT + _GAP_HEIGHT)
134 issue = issues_map[issue_id]
135 title = issue.title if len(issue.title) <= avail else issue.title[: avail - 1] + "…"
136 content = [f"[{issue.priority}] {issue_id}", title]
137 _draw_box(grid, row, _BOX_MARGIN, box_width, _BOX_HEIGHT, content, False, "0")
138 box_start[issue_id] = row
140 # Annotate gap rows with arrow characters and colored edge labels.
141 # Only draw connectors when a real edge exists between consecutive nodes.
142 arrow_labels: dict[int, str] = {}
143 for i in range(n - 1):
144 a_id = ordered_ids[i]
145 b_id = ordered_ids[i + 1]
146 gap_row = box_start[a_id] + _BOX_HEIGHT
148 rel = edge_map.get((a_id, b_id)) or edge_map.get((b_id, a_id))
149 if rel:
150 if gap_row < grid_h:
151 grid[gap_row][center_col] = "│"
152 if gap_row + 1 < grid_h:
153 grid[gap_row + 1][center_col] = "▼"
154 color = EDGE_COLOR.get(rel, "37")
155 arrow_labels[gap_row] = colorize(f" {rel}", color)
157 # Convert grid to string lines, appending annotations after arrow rows
158 lines: list[str] = []
159 for r, row_chars in enumerate(grid):
160 line = "".join(row_chars).rstrip()
161 if r in arrow_labels:
162 line += arrow_labels[r]
163 lines.append(line)
165 while lines and not lines[-1].strip():
166 lines.pop()
168 # Append annotations for skip-level edges (non-consecutive in topo order).
169 pos = {id_: i for i, id_ in enumerate(ordered_ids)}
170 skip_edges = [(f, t, r) for (f, t), r in sorted(edge_map.items()) if pos[t] - pos[f] > 1]
171 if skip_edges:
172 lines.append("")
173 for src, dst, rel in skip_edges:
174 color = EDGE_COLOR.get(rel, "37")
175 lines.append(f" {src} {colorize('→', color)} {dst} ({rel})")
177 return lines
180def cmd_clusters(config: BRConfig, args: argparse.Namespace) -> int:
181 """Render issue relationship clusters as box diagrams.
183 Args:
184 config: Project configuration (provides issue directories and CLI settings)
185 args: Parsed CLI args (include_orphans: bool, min_connections: int, json: bool)
187 Returns:
188 Exit code (0 = success)
189 """
190 from little_loops.cli.output import terminal_width
191 from little_loops.dependency_graph import DependencyGraph
192 from little_loops.dependency_mapper.operations import gather_all_issue_ids
193 from little_loops.issue_parser import find_issues
195 issues = find_issues(config)
196 if not issues:
197 print("No active issues found.")
198 return 0
200 issues_dir = config.project_root / config.issues.base_dir
201 all_known_ids = gather_all_issue_ids(issues_dir, config=config)
202 graph = DependencyGraph.from_issues(issues, all_known_ids=all_known_ids)
203 all_ids = set(graph.issues.keys())
204 components = _get_connected_components(graph, all_ids)
206 include_orphans: bool = getattr(args, "include_orphans", False)
207 if not include_orphans:
208 components = [c for c in components if len(c) > 1]
210 min_conn: int = getattr(args, "min_connections", 0) or 0
211 if min_conn > 0:
213 def _max_degree(comp: list[str]) -> int:
214 return max(
215 len(graph.blocked_by.get(id_, set()) | graph.blocks.get(id_, set())) for id_ in comp
216 )
218 components = [c for c in components if _max_degree(c) >= min_conn]
220 if not components:
221 all_components = _get_connected_components(graph, all_ids)
222 if all(len(c) == 1 for c in all_components):
223 print("No issue relationships found. Use --include-orphans to show isolated issues.")
224 else:
225 print("No clusters match the specified filters.")
226 return 0
228 # JSON mode: emit structured data, no diagram rendering
229 if getattr(args, "json", False):
230 output = []
231 for idx, comp in enumerate(components, 1):
232 comp_set = set(comp)
233 edges = _cluster_edges(comp_set, graph)
234 output.append(
235 {
236 "cluster_index": idx,
237 "issue_count": len(comp),
238 "issues": [
239 {
240 "id": id_,
241 "priority": graph.issues[id_].priority,
242 "title": graph.issues[id_].title,
243 }
244 for id_ in sorted(comp)
245 ],
246 "edges": [{"from": f, "to": t, "relationship": r} for f, t, r in edges],
247 }
248 )
249 print_json(output)
250 return 0
252 # Text rendering
253 width = terminal_width()
254 box_w = max(20, min(_MAX_BOX_WIDTH, width - _BOX_MARGIN * 2 - 4))
255 issues_map = graph.issues
256 total_issues = sum(len(c) for c in components)
258 for idx, comp in enumerate(components, 1):
259 comp_set = set(comp)
260 edges = _cluster_edges(comp_set, graph)
261 edge_map: dict[tuple[str, str], str] = {(f, t): r for f, t, r in edges}
263 ordered, has_cycle = _topo_sort_cluster(comp, graph.blocked_by)
265 sep = "─" * 3
266 n_issues = len(comp)
267 noun = "issue" if n_issues == 1 else "issues"
268 print(f"{sep} Cluster {idx} ({n_issues} {noun}) {sep}")
269 if has_cycle:
270 print("⚠ cycle detected — using fallback layout")
272 diagram_lines = _render_cluster_diagram(ordered, issues_map, edge_map, box_w)
273 print("\n".join(diagram_lines))
274 print()
276 n_clusters = len(components)
277 cluster_noun = "cluster" if n_clusters == 1 else "clusters"
278 issue_noun = "issue" if total_issues == 1 else "issues"
279 print(f"{n_clusters} {cluster_noun}, {total_issues} {issue_noun} total")
280 return 0