Coverage for little_loops / cli / issues / clusters.py: 96%

163 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-issues clusters: Render issue relationship clusters as box diagrams.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6from collections import deque 

7from typing import TYPE_CHECKING 

8 

9from little_loops.cli.output import colorize, print_json 

10 

11if TYPE_CHECKING: 

12 from little_loops.config import BRConfig 

13 from little_loops.dependency_graph import DependencyGraph 

14 from little_loops.issue_parser import IssueInfo 

15 

16# ANSI color codes per relationship type 

17EDGE_COLOR: dict[str, str] = { 

18 "blocks": "31", # red 

19 "blocked_by": "33", # yellow 

20 "parent": "34", # blue 

21 "sibling": "36", # cyan 

22} 

23 

24_BOX_HEIGHT = 4 # top border + 2 content lines + bottom border 

25_GAP_HEIGHT = 2 # rows between boxes for arrow drawing 

26_BOX_MARGIN = 2 # left-margin column offset 

27_MAX_BOX_WIDTH = 60 

28 

29 

30def _get_connected_components(graph: DependencyGraph, all_ids: set[str]) -> list[list[str]]: 

31 """BFS over undirected dependency graph to find connected components. 

32 

33 Returns components sorted by size descending. 

34 """ 

35 visited: set[str] = set() 

36 components: list[list[str]] = [] 

37 

38 for node in sorted(all_ids): 

39 if node in visited: 

40 continue 

41 component: list[str] = [] 

42 queue: deque[str] = deque([node]) 

43 while queue: 

44 current = queue.popleft() 

45 if current in visited: 

46 continue 

47 visited.add(current) 

48 component.append(current) 

49 neighbors = ( 

50 graph.blocked_by.get(current, set()) | graph.blocks.get(current, set()) 

51 ) & all_ids 

52 for neighbor in sorted(neighbors): 

53 if neighbor not in visited: 

54 queue.append(neighbor) 

55 components.append(component) 

56 

57 return sorted(components, key=len, reverse=True) 

58 

59 

60def _topo_sort_cluster( 

61 cluster_ids: list[str], 

62 blocked_by: dict[str, set[str]], 

63) -> tuple[list[str], bool]: 

64 """Topological sort (Kahn's) scoped to this cluster. 

65 

66 Returns (sorted_ids, has_cycle). Nodes in cycles are appended in sorted 

67 order after the acyclic prefix so the caller always gets a full list. 

68 """ 

69 cluster_set = set(cluster_ids) 

70 in_degree: dict[str, int] = dict.fromkeys(cluster_ids, 0) 

71 adj: dict[str, list[str]] = {id_: [] for id_ in cluster_ids} 

72 

73 for id_ in cluster_ids: 

74 for dep in sorted(blocked_by.get(id_, set()) & cluster_set): 

75 in_degree[id_] += 1 

76 adj[dep].append(id_) 

77 

78 queue: deque[str] = deque(sorted(id_ for id_, deg in in_degree.items() if deg == 0)) 

79 result: list[str] = [] 

80 

81 while queue: 

82 node = queue.popleft() 

83 result.append(node) 

84 for dep in sorted(adj[node]): 

85 in_degree[dep] -= 1 

86 if in_degree[dep] == 0: 

87 queue.append(dep) 

88 

89 has_cycle = len(result) < len(cluster_ids) 

90 if has_cycle: 

91 remaining = sorted(id_ for id_ in cluster_ids if id_ not in set(result)) 

92 result.extend(remaining) 

93 

94 return result, has_cycle 

95 

96 

97def _cluster_edges(cluster_ids: set[str], graph: DependencyGraph) -> list[tuple[str, str, str]]: 

98 """Return directed edges within a cluster as (from_id, to_id, relationship).""" 

99 edges: list[tuple[str, str, str]] = [] 

100 for id_ in sorted(cluster_ids): 

101 for blocked_id in sorted(graph.blocks.get(id_, set())): 

102 if blocked_id in cluster_ids: 

103 edges.append((id_, blocked_id, "blocks")) 

104 return edges 

105 

106 

107def _render_cluster_diagram( 

108 ordered_ids: list[str], 

109 issues_map: dict[str, IssueInfo], 

110 edge_map: dict[tuple[str, str], str], 

111 box_width: int, 

112) -> list[str]: 

113 """Render a cluster as a vertical stack of box diagrams with arrows. 

114 

115 Uses _draw_box from cli/loop/layout.py as the box primitive. 

116 Arrows are drawn between consecutive nodes; edge labels are appended 

117 after the arrow row so ANSI escapes don't corrupt the character grid. 

118 """ 

119 # Intentional cross-module import of a private primitive; _draw_box is 

120 # reusable and has no FSM-specific logic when is_highlighted=False. 

121 from little_loops.cli.loop.layout import _draw_box 

122 

123 n = len(ordered_ids) 

124 grid_h = n * _BOX_HEIGHT + max(0, n - 1) * _GAP_HEIGHT 

125 grid_w = box_width + _BOX_MARGIN * 2 + 2 

126 grid: list[list[str]] = [[" "] * grid_w for _ in range(grid_h)] 

127 

128 center_col = _BOX_MARGIN + box_width // 2 

129 box_start: dict[str, int] = {} 

130 avail = box_width - 4 # interior width minus side margins 

131 

132 for i, issue_id in enumerate(ordered_ids): 

133 row = i * (_BOX_HEIGHT + _GAP_HEIGHT) 

134 issue = issues_map[issue_id] 

135 title = issue.title if len(issue.title) <= avail else issue.title[: avail - 1] + "…" 

136 content = [f"[{issue.priority}] {issue_id}", title] 

137 _draw_box(grid, row, _BOX_MARGIN, box_width, _BOX_HEIGHT, content, False, "0") 

138 box_start[issue_id] = row 

139 

140 # Annotate gap rows with arrow characters and colored edge labels. 

141 # Only draw connectors when a real edge exists between consecutive nodes. 

142 arrow_labels: dict[int, str] = {} 

143 for i in range(n - 1): 

144 a_id = ordered_ids[i] 

145 b_id = ordered_ids[i + 1] 

146 gap_row = box_start[a_id] + _BOX_HEIGHT 

147 

148 rel = edge_map.get((a_id, b_id)) or edge_map.get((b_id, a_id)) 

149 if rel: 

150 if gap_row < grid_h: 

151 grid[gap_row][center_col] = "│" 

152 if gap_row + 1 < grid_h: 

153 grid[gap_row + 1][center_col] = "▼" 

154 color = EDGE_COLOR.get(rel, "37") 

155 arrow_labels[gap_row] = colorize(f" {rel}", color) 

156 

157 # Convert grid to string lines, appending annotations after arrow rows 

158 lines: list[str] = [] 

159 for r, row_chars in enumerate(grid): 

160 line = "".join(row_chars).rstrip() 

161 if r in arrow_labels: 

162 line += arrow_labels[r] 

163 lines.append(line) 

164 

165 while lines and not lines[-1].strip(): 

166 lines.pop() 

167 

168 # Append annotations for skip-level edges (non-consecutive in topo order). 

169 pos = {id_: i for i, id_ in enumerate(ordered_ids)} 

170 skip_edges = [(f, t, r) for (f, t), r in sorted(edge_map.items()) if pos[t] - pos[f] > 1] 

171 if skip_edges: 

172 lines.append("") 

173 for src, dst, rel in skip_edges: 

174 color = EDGE_COLOR.get(rel, "37") 

175 lines.append(f" {src} {colorize('→', color)} {dst} ({rel})") 

176 

177 return lines 

178 

179 

180def cmd_clusters(config: BRConfig, args: argparse.Namespace) -> int: 

181 """Render issue relationship clusters as box diagrams. 

182 

183 Args: 

184 config: Project configuration (provides issue directories and CLI settings) 

185 args: Parsed CLI args (include_orphans: bool, min_connections: int, json: bool) 

186 

187 Returns: 

188 Exit code (0 = success) 

189 """ 

190 from little_loops.cli.output import terminal_width 

191 from little_loops.dependency_graph import DependencyGraph 

192 from little_loops.dependency_mapper.operations import gather_all_issue_ids 

193 from little_loops.issue_parser import find_issues 

194 

195 issues = find_issues(config) 

196 if not issues: 

197 print("No active issues found.") 

198 return 0 

199 

200 issues_dir = config.project_root / config.issues.base_dir 

201 all_known_ids = gather_all_issue_ids(issues_dir, config=config) 

202 graph = DependencyGraph.from_issues(issues, all_known_ids=all_known_ids) 

203 all_ids = set(graph.issues.keys()) 

204 components = _get_connected_components(graph, all_ids) 

205 

206 include_orphans: bool = getattr(args, "include_orphans", False) 

207 if not include_orphans: 

208 components = [c for c in components if len(c) > 1] 

209 

210 min_conn: int = getattr(args, "min_connections", 0) or 0 

211 if min_conn > 0: 

212 

213 def _max_degree(comp: list[str]) -> int: 

214 return max( 

215 len(graph.blocked_by.get(id_, set()) | graph.blocks.get(id_, set())) for id_ in comp 

216 ) 

217 

218 components = [c for c in components if _max_degree(c) >= min_conn] 

219 

220 if not components: 

221 all_components = _get_connected_components(graph, all_ids) 

222 if all(len(c) == 1 for c in all_components): 

223 print("No issue relationships found. Use --include-orphans to show isolated issues.") 

224 else: 

225 print("No clusters match the specified filters.") 

226 return 0 

227 

228 # JSON mode: emit structured data, no diagram rendering 

229 if getattr(args, "json", False): 

230 output = [] 

231 for idx, comp in enumerate(components, 1): 

232 comp_set = set(comp) 

233 edges = _cluster_edges(comp_set, graph) 

234 output.append( 

235 { 

236 "cluster_index": idx, 

237 "issue_count": len(comp), 

238 "issues": [ 

239 { 

240 "id": id_, 

241 "priority": graph.issues[id_].priority, 

242 "title": graph.issues[id_].title, 

243 } 

244 for id_ in sorted(comp) 

245 ], 

246 "edges": [{"from": f, "to": t, "relationship": r} for f, t, r in edges], 

247 } 

248 ) 

249 print_json(output) 

250 return 0 

251 

252 # Text rendering 

253 width = terminal_width() 

254 box_w = max(20, min(_MAX_BOX_WIDTH, width - _BOX_MARGIN * 2 - 4)) 

255 issues_map = graph.issues 

256 total_issues = sum(len(c) for c in components) 

257 

258 for idx, comp in enumerate(components, 1): 

259 comp_set = set(comp) 

260 edges = _cluster_edges(comp_set, graph) 

261 edge_map: dict[tuple[str, str], str] = {(f, t): r for f, t, r in edges} 

262 

263 ordered, has_cycle = _topo_sort_cluster(comp, graph.blocked_by) 

264 

265 sep = "─" * 3 

266 n_issues = len(comp) 

267 noun = "issue" if n_issues == 1 else "issues" 

268 print(f"{sep} Cluster {idx} ({n_issues} {noun}) {sep}") 

269 if has_cycle: 

270 print("⚠ cycle detected — using fallback layout") 

271 

272 diagram_lines = _render_cluster_diagram(ordered, issues_map, edge_map, box_w) 

273 print("\n".join(diagram_lines)) 

274 print() 

275 

276 n_clusters = len(components) 

277 cluster_noun = "cluster" if n_clusters == 1 else "clusters" 

278 issue_noun = "issue" if total_issues == 1 else "issues" 

279 print(f"{n_clusters} {cluster_noun}, {total_issues} {issue_noun} total") 

280 return 0