Coverage for little_loops / cli / loop / layout.py: 92%

1030 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""FSM diagram layout engine. 

2 

3Implements adaptive layout for FSM diagrams using a Sugiyama-inspired 

4layered graph drawing approach. Supports topology detection, vertical 

5linear chains, side-by-side branching, and margin back-edge rendering. 

6 

7Extracted from info.py and extended with adaptive layout capabilities. 

8""" 

9 

10from __future__ import annotations 

11 

12import re 

13from collections import deque 

14 

15from wcwidth import wcswidth as _wcswidth 

16from wcwidth import wcwidth as _wcwidth 

17 

18from little_loops.cli.output import colorize, terminal_width 

19from little_loops.fsm.schema import FSMLoop, StateConfig 

20 

21_ANSI_ESCAPE_RE = re.compile(r"\033\[[0-9;]*m") 

22 

23# --------------------------------------------------------------------------- 

24# Edge label colorization 

25# --------------------------------------------------------------------------- 

26 

27_EDGE_LABEL_COLORS: dict[str, str] = { 

28 "yes": "32", 

29 "no": "38;5;208", 

30 "error": "31", 

31 "partial": "33", 

32 "next": "2", 

33 "_": "2", 

34 "blocked": "31", 

35 "retry_exhausted": "38;5;208", 

36 "rate_limit_exhausted": "38;5;214", 

37 "throttle_hard": "38;5;196", 

38} 

39 

40 

41def _colorize_label(label: str) -> str: 

42 """Colorize a (possibly compound) edge label like 'no/error'.""" 

43 parts = label.split("/") 

44 code = "" 

45 for part in parts: 

46 if part in ("no", "error"): 

47 code = _EDGE_LABEL_COLORS["no"] 

48 break 

49 if part == "partial" and not code: 

50 code = _EDGE_LABEL_COLORS["partial"] 

51 elif part == "yes" and not code: 

52 code = _EDGE_LABEL_COLORS["yes"] 

53 elif part in ("next", "_") and not code: 

54 code = _EDGE_LABEL_COLORS["next"] 

55 return colorize(label, code) if code else label 

56 

57 

58def _edge_line_color(label: str) -> str: 

59 """Return the ANSI SGR code to use for connector characters of an edge. 

60 

61 Applies the same priority cascade as ``_colorize_label`` so that line 

62 characters (│, ─, ▼, ▶, corners) match the semantic color of their label. 

63 Returns an empty string when no color applies (callers treat this as "no-op"). 

64 """ 

65 parts = label.split("/") 

66 code = "" 

67 for part in parts: 

68 if part in ( 

69 "no", 

70 "error", 

71 "blocked", 

72 "retry_exhausted", 

73 "rate_limit_exhausted", 

74 "throttle_hard", 

75 ): 

76 return _EDGE_LABEL_COLORS.get(part, "31") 

77 if part == "partial" and not code: 

78 code = _EDGE_LABEL_COLORS["partial"] 

79 elif part == "yes" and not code: 

80 code = _EDGE_LABEL_COLORS["yes"] 

81 elif part in ("next", "_") and not code: 

82 code = _EDGE_LABEL_COLORS["next"] 

83 return code 

84 

85 

86def _colorize_diagram_labels(diagram: str, colors: dict[str, str] | None = None) -> str: 

87 """Apply ANSI color to known edge labels in a rendered diagram string. 

88 

89 Labels are colorized only when bounded by box-drawing or whitespace chars 

90 to avoid coloring partial matches inside state names. 

91 

92 Args: 

93 colors: Optional label→SGR-code mapping; falls back to _EDGE_LABEL_COLORS if None. 

94 """ 

95 label_colors = colors if colors is not None else _EDGE_LABEL_COLORS 

96 for label, code in label_colors.items(): 

97 colored = colorize(label, code) 

98 diagram = re.sub( 

99 r"(?<=[─ │▶\n])" + re.escape(label) + r"(?=[─ │▶\n])", 

100 colored, 

101 diagram, 

102 ) 

103 return diagram 

104 

105 

106# --------------------------------------------------------------------------- 

107# State box badge definitions 

108# --------------------------------------------------------------------------- 

109 

110_ACTION_TYPE_BADGES: dict[str, str] = { 

111 "prompt": "\u2726", # ✦ 

112 "slash_command": "/\u2501\u25ba", # /━► 

113 "shell": "\u276f_", # ❯_ 

114 "mcp_tool": "\u26a1", # ⚡ 

115} 

116 

117_SUB_LOOP_BADGE = "\u21b3\u27f3" # ↳⟳ 

118_ROUTE_BADGE = "\u2443" # ⑃ 

119 

120 

121def _badge_display_width(badge: str) -> int: 

122 """Compute terminal display width of a badge string using wcwidth.""" 

123 w = _wcswidth(badge) 

124 return w if w >= 0 else len(badge) 

125 

126 

127def _get_state_badge(state: StateConfig | None, badges: dict[str, str] | None = None) -> str: 

128 """Return the unicode badge string for a state, or '' if none.""" 

129 if state is None: 

130 return "" 

131 effective = {**_ACTION_TYPE_BADGES, **(badges or {})} 

132 sub_loop_badge = (badges or {}).get("sub_loop", _SUB_LOOP_BADGE) 

133 route_badge = (badges or {}).get("route", _ROUTE_BADGE) 

134 if state.loop is not None: 

135 return sub_loop_badge 

136 if state.action_type: 

137 return effective.get(state.action_type, f"[{state.action_type}]") 

138 if state.action: 

139 return effective["shell"] 

140 if state.route is not None: 

141 return route_badge 

142 return "" 

143 

144 

145# --------------------------------------------------------------------------- 

146# Box content helpers for multi-row diagram boxes 

147# --------------------------------------------------------------------------- 

148 

149 

150def _box_inner_lines( 

151 state: StateConfig | None, 

152 display_label: str, 

153 verbose: bool, 

154 inner_width: int, 

155) -> list[str]: 

156 """Return interior lines for a state box (between top and bottom borders). 

157 

158 The first line is always ``display_label`` + type badge (if any). 

159 Subsequent lines are action content lines. All lines fit within 

160 ``inner_width`` characters (content is truncated or wrapped accordingly). 

161 """ 

162 # Badge is now rendered in the top-right corner by _draw_box; name row is label only 

163 name_line = display_label[:inner_width] 

164 

165 lines: list[str] = [name_line] 

166 

167 # Action lines 

168 if state and state.action: 

169 action_src = [ln.rstrip() for ln in state.action.strip().splitlines()] 

170 if verbose: 

171 for src in action_src: 

172 if not src: 

173 continue 

174 # Wrap long lines to inner_width 

175 while len(src) > inner_width: 

176 lines.append(src[:inner_width]) 

177 src = src[inner_width:] 

178 if src: 

179 lines.append(src) 

180 else: 

181 # Show first non-empty line, truncated 

182 first = next((ln for ln in action_src if ln), "") 

183 if len(first) > inner_width: 

184 first = first[: inner_width - 1] + "\u2026" 

185 if first: 

186 lines.append(first) 

187 

188 return lines 

189 

190 

191# --------------------------------------------------------------------------- 

192# Topology detection 

193# --------------------------------------------------------------------------- 

194 

195 

196def _collect_edges(fsm: FSMLoop) -> list[tuple[str, str, str]]: 

197 """Collect all (source, target, label) edges from an FSM.""" 

198 edges: list[tuple[str, str, str]] = [] 

199 for name, state in fsm.states.items(): 

200 if state.on_yes: 

201 edges.append((name, state.on_yes, "yes")) 

202 if state.on_no: 

203 edges.append((name, state.on_no, "no")) 

204 if state.on_error: 

205 edges.append((name, state.on_error, "error")) 

206 if state.on_partial: 

207 edges.append((name, state.on_partial, "partial")) 

208 if state.on_blocked: 

209 edges.append((name, state.on_blocked, "blocked")) 

210 if state.on_retry_exhausted: 

211 edges.append((name, state.on_retry_exhausted, "retry_exhausted")) 

212 if state.on_rate_limit_exhausted: 

213 edges.append((name, state.on_rate_limit_exhausted, "rate_limit_exhausted")) 

214 if state.on_throttle_hard: 

215 edges.append((name, state.on_throttle_hard, "throttle_hard")) 

216 if state.next: 

217 edges.append((name, state.next, "next")) 

218 if state.route: 

219 for verdict, target in state.route.routes.items(): 

220 edges.append((name, target, verdict)) 

221 if state.route.default: 

222 edges.append((name, state.route.default, "_")) 

223 for verdict, target in state.extra_routes.items(): 

224 edges.append((name, target, verdict)) 

225 return edges 

226 

227 

228def _bfs_order(initial: str, edges: list[tuple[str, str, str]]) -> tuple[list[str], dict[str, int]]: 

229 """BFS from initial state. Returns (order, depth_map).""" 

230 order: list[str] = [] 

231 depth: dict[str, int] = {initial: 0} 

232 queue: deque[str] = deque([initial]) 

233 while queue: 

234 node = queue.popleft() 

235 order.append(node) 

236 for src, dst, _ in edges: 

237 if src == node and dst not in depth: 

238 depth[dst] = depth[node] + 1 

239 queue.append(dst) 

240 return order, depth 

241 

242 

243def _trace_main_path( 

244 fsm: FSMLoop, edges: list[tuple[str, str, str]] 

245) -> tuple[list[str], set[tuple[str, str]]]: 

246 """Trace the main (happy) path through the FSM.""" 

247 visited: set[str] = set() 

248 main_path: list[str] = [] 

249 main_edge_set: set[tuple[str, str]] = set() 

250 current = fsm.initial 

251 while current and current not in visited: 

252 visited.add(current) 

253 main_path.append(current) 

254 st = fsm.states.get(current) 

255 if not st or st.terminal: 

256 break 

257 nxt: str = st.on_yes or st.next or "" 

258 if not nxt and st.route: 

259 nxt = next(iter(st.route.routes.values()), "") or st.route.default or "" 

260 if nxt: 

261 main_edge_set.add((current, nxt)) 

262 current = nxt 

263 else: 

264 break 

265 return main_path, main_edge_set 

266 

267 

268def _classify_edges( 

269 edges: list[tuple[str, str, str]], 

270 main_edge_set: set[tuple[str, str]], 

271 bfs_order: list[str], 

272) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]: 

273 """Split non-main edges into branches and back_edges.""" 

274 main_consumed: set[int] = set() 

275 for src, dst in main_edge_set: 

276 for i, (s, d, _) in enumerate(edges): 

277 if s == src and d == dst and i not in main_consumed: 

278 main_consumed.add(i) 

279 break 

280 

281 branches: list[tuple[str, str, str]] = [] 

282 back_edges: list[tuple[str, str, str]] = [] 

283 for i, (src, dst, label) in enumerate(edges): 

284 if i in main_consumed: 

285 continue 

286 src_pos = bfs_order.index(src) if src in bfs_order else len(bfs_order) 

287 dst_pos = bfs_order.index(dst) if dst in bfs_order else len(bfs_order) 

288 if dst == src or dst_pos < src_pos: 

289 back_edges.append((src, dst, label)) 

290 else: 

291 branches.append((src, dst, label)) 

292 return branches, back_edges 

293 

294 

295class TopologyDetector: 

296 """Classify FSM graph topology for layout strategy selection.""" 

297 

298 def __init__( 

299 self, 

300 edges: list[tuple[str, str, str]], 

301 main_path: list[str], 

302 branches: list[tuple[str, str, str]], 

303 back_edges: list[tuple[str, str, str]], 

304 ) -> None: 

305 self._edges = edges 

306 self._main_path = main_path 

307 self._branches = branches 

308 self._back_edges = back_edges 

309 

310 def classify(self) -> str: 

311 """Return 'linear', 'tree', or 'general'. 

312 

313 Linear: main path only, no non-self branches, only self-loop back-edges. 

314 Tree: branches exist but no fan-in (every non-initial state has ≤1 incoming). 

315 General: everything else (full Sugiyama needed). 

316 """ 

317 non_self_branches = [b for b in self._branches if b[0] != b[1]] 

318 non_self_back = [b for b in self._back_edges if b[0] != b[1]] 

319 

320 if not non_self_branches and not non_self_back: 

321 return "linear" 

322 

323 # Check for fan-in: any state with >1 incoming forward edge 

324 in_count: dict[str, int] = {} 

325 for _, dst, _ in self._edges: 

326 # Only count forward edges (not back-edges) 

327 in_count[dst] = in_count.get(dst, 0) + 1 

328 

329 if not non_self_back and all(v <= 1 for v in in_count.values()): 

330 return "tree" 

331 

332 return "general" 

333 

334 

335# --------------------------------------------------------------------------- 

336# Sugiyama layout pipeline 

337# --------------------------------------------------------------------------- 

338 

339 

340class LayerAssigner: 

341 """Assign nodes to layers using longest-path + width constraint.""" 

342 

343 def __init__( 

344 self, 

345 all_states: list[str], 

346 edges: list[tuple[str, str, str]], 

347 back_edge_set: set[tuple[str, str]], 

348 initial: str, 

349 max_width: int = 4, 

350 ) -> None: 

351 self._all_states = all_states 

352 self._edges = edges 

353 self._back_edge_set = back_edge_set 

354 self._initial = initial 

355 self._max_width = max_width 

356 

357 def assign(self) -> list[list[str]]: 

358 """Return list of layers, each a list of state names (top to bottom).""" 

359 # Build adjacency (forward edges only) 

360 forward: dict[str, list[str]] = {s: [] for s in self._all_states} 

361 reverse: dict[str, list[str]] = {s: [] for s in self._all_states} 

362 seen_edges: set[tuple[str, str]] = set() 

363 for src, dst, _ in self._edges: 

364 if (src, dst) in self._back_edge_set or src == dst: 

365 continue 

366 if src in forward and dst in forward and (src, dst) not in seen_edges: 

367 forward[src].append(dst) 

368 reverse[dst].append(src) 

369 seen_edges.add((src, dst)) 

370 

371 # Longest-path layer assignment (topological order) 

372 layer_of: dict[str, int] = {} 

373 

374 # Kahn's algorithm for topological order 

375 in_degree = {s: len(reverse[s]) for s in self._all_states} 

376 queue: deque[str] = deque() 

377 for s in self._all_states: 

378 if in_degree[s] == 0: 

379 queue.append(s) 

380 

381 topo_order: list[str] = [] 

382 while queue: 

383 node = queue.popleft() 

384 topo_order.append(node) 

385 for dst in forward[node]: 

386 in_degree[dst] -= 1 

387 if in_degree[dst] == 0: 

388 queue.append(dst) 

389 

390 # Handle nodes not reached by topo sort (cycles in forward graph) 

391 for s in self._all_states: 

392 if s not in set(topo_order): 

393 topo_order.append(s) 

394 

395 # Assign layers: longest path from root 

396 for node in topo_order: 

397 if not reverse[node]: 

398 layer_of[node] = 0 

399 else: 

400 layer_of[node] = max( 

401 (layer_of.get(p, 0) + 1 for p in reverse[node]), 

402 default=0, 

403 ) 

404 

405 # Ensure initial state is at layer 0 

406 if self._initial in layer_of and layer_of[self._initial] != 0: 

407 offset = layer_of[self._initial] 

408 for s in layer_of: 

409 layer_of[s] -= offset 

410 

411 # Build layers list 

412 max_layer = max(layer_of.values(), default=0) 

413 layers: list[list[str]] = [[] for _ in range(max_layer + 1)] 

414 for s in self._all_states: 

415 layer = layer_of.get(s, 0) 

416 layers[layer].append(s) 

417 

418 # Width constraint: if any layer exceeds max_width, split 

419 if self._max_width > 0: 

420 new_layers: list[list[str]] = [] 

421 for grp in layers: 

422 remaining = list(grp) 

423 while len(remaining) > self._max_width: 

424 new_layers.append(remaining[: self._max_width]) 

425 remaining = remaining[self._max_width :] 

426 if remaining: 

427 new_layers.append(remaining) 

428 layers = new_layers 

429 

430 return layers 

431 

432 

433class CrossingMinimizer: 

434 """Minimize edge crossings using barycenter heuristic.""" 

435 

436 def __init__( 

437 self, 

438 layers: list[list[str]], 

439 edges: list[tuple[str, str, str]], 

440 back_edge_set: set[tuple[str, str]], 

441 ) -> None: 

442 self._layers = layers 

443 self._edges = edges 

444 self._back_edge_set = back_edge_set 

445 

446 def minimize(self) -> list[list[str]]: 

447 """Return reordered layers with reduced crossings.""" 

448 # Build position lookup 

449 layer_of: dict[str, int] = {} 

450 for i, layer in enumerate(self._layers): 

451 for s in layer: 

452 layer_of[s] = i 

453 

454 # Forward adjacency (non-back, non-self) 

455 adj_down: dict[str, list[str]] = {} 

456 adj_up: dict[str, list[str]] = {} 

457 for src, dst, _ in self._edges: 

458 if (src, dst) in self._back_edge_set or src == dst: 

459 continue 

460 if src in layer_of and dst in layer_of: 

461 adj_down.setdefault(src, []).append(dst) 

462 adj_up.setdefault(dst, []).append(src) 

463 

464 layers = [list(layer) for layer in self._layers] 

465 

466 # 3 sweeps: down, up, down 

467 for sweep in range(3): 

468 if sweep % 2 == 0: 

469 # Top-down sweep 

470 for i in range(1, len(layers)): 

471 pos_above = {s: j for j, s in enumerate(layers[i - 1])} 

472 bary: dict[str, float] = {} 

473 for s in layers[i]: 

474 parents = [p for p in adj_up.get(s, []) if p in pos_above] 

475 if parents: 

476 bary[s] = sum(pos_above[p] for p in parents) / len(parents) 

477 else: 

478 bary[s] = float(layers[i].index(s)) 

479 layers[i].sort(key=lambda s: bary.get(s, 0)) 

480 else: 

481 # Bottom-up sweep 

482 for i in range(len(layers) - 2, -1, -1): 

483 pos_below = {s: j for j, s in enumerate(layers[i + 1])} 

484 bary_up: dict[str, float] = {} 

485 for s in layers[i]: 

486 children = [c for c in adj_down.get(s, []) if c in pos_below] 

487 if children: 

488 bary_up[s] = sum(pos_below[c] for c in children) / len(children) 

489 else: 

490 bary_up[s] = float(layers[i].index(s)) 

491 layers[i].sort(key=lambda s: bary_up.get(s, 0)) 

492 

493 return layers 

494 

495 

496# --------------------------------------------------------------------------- 

497# Rendering helpers 

498# --------------------------------------------------------------------------- 

499 

500 

501def _compute_display_labels( 

502 all_states: list[str], 

503 initial: str, 

504 terminal_states: set[str], 

505) -> dict[str, str]: 

506 """Compute display labels with → prefix and ◉ suffix.""" 

507 display_label: dict[str, str] = {} 

508 for s in all_states: 

509 label = s 

510 if s == initial: 

511 label = "\u2192 " + label 

512 if s in terminal_states: 

513 label = label + " \u25c9" 

514 display_label[s] = label 

515 return display_label 

516 

517 

518def _compute_box_sizes( 

519 all_states: list[str], 

520 display_label: dict[str, str], 

521 fsm_states: dict[str, StateConfig] | None, 

522 verbose: bool, 

523 max_box_inner: int, 

524 badges: dict[str, str] | None = None, 

525) -> tuple[dict[str, list[str]], dict[str, int], dict[str, int], dict[str, str]]: 

526 """Compute box content, widths, and heights for all states. 

527 

528 Returns (box_inner, box_width, box_height, box_badge). 

529 """ 

530 box_inner: dict[str, list[str]] = {} 

531 box_width: dict[str, int] = {} 

532 box_badge: dict[str, str] = {} 

533 

534 for s in all_states: 

535 state_obj = fsm_states.get(s) if fsm_states else None 

536 

537 badge = _get_state_badge(state_obj, badges) 

538 badge_w = _badge_display_width(badge) if badge else 0 

539 box_badge[s] = badge 

540 

541 # Width must fit: name label on content row, badge on top border (with one space 

542 # of padding on each side: " badge ") 

543 base_w = max(len(display_label[s]), badge_w + 2 if badge_w else 0) 

544 

545 inner_w = base_w 

546 if state_obj and state_obj.action and max_box_inner > 0: 

547 action_lines = state_obj.action.strip().splitlines() 

548 if verbose: 

549 max_action_w = max( 

550 (len(ln.rstrip()) for ln in action_lines if ln.rstrip()), default=0 

551 ) 

552 inner_w = max(base_w, min(max_action_w, max_box_inner)) 

553 else: 

554 first_action = next((ln.rstrip() for ln in action_lines if ln.rstrip()), "") 

555 inner_w = max(base_w, min(len(first_action), max_box_inner)) 

556 

557 content = _box_inner_lines(state_obj, display_label[s], verbose, inner_w) 

558 actual_w = max(len(ln) for ln in content) 

559 inner_w = max(inner_w, actual_w) 

560 box_inner[s] = content 

561 box_width[s] = inner_w + 4 # "│ " + content + " │" 

562 

563 box_height: dict[str, int] = {s: len(box_inner[s]) + 2 for s in all_states} 

564 return box_inner, box_width, box_height, box_badge 

565 

566 

567def _draw_box( 

568 grid: list[list[str]], 

569 row: int, 

570 col: int, 

571 width: int, 

572 height: int, 

573 content: list[str], 

574 is_highlighted: bool, 

575 highlight_color: str, 

576 badge: str = "", 

577) -> None: 

578 """Draw a state box onto a character grid at (row, col). 

579 

580 If *badge* is provided it is placed right-aligned in the top border row with 

581 one space of padding on each side (``─ badge ┐``), colorized via ``_bc()``. 

582 """ 

583 total_width = len(grid[0]) if grid else 0 

584 

585 def _bc(ch: str) -> str: 

586 return colorize(ch, highlight_color) if is_highlighted else ch 

587 

588 # Top border: ┌ ─ ─ … ─ ┐ 

589 if col < total_width: 

590 grid[row][col] = _bc("\u250c") 

591 for j in range(1, width - 1): 

592 if col + j < total_width: 

593 grid[row][col + j] = _bc("\u2500") 

594 if col + width - 1 < total_width: 

595 grid[row][col + width - 1] = _bc("\u2510") 

596 

597 # Overlay badge in top-right corner with one space of padding on each side: " badge ┐" 

598 if badge: 

599 badge_w = _badge_display_width(badge) 

600 # Trailing space between badge end and ┐ 

601 trail_pos = col + width - 2 

602 if col + 1 <= trail_pos < col + width - 1 and trail_pos < total_width: 

603 grid[row][trail_pos] = _bc(" ") 

604 # Leading space before badge 

605 lead_pos = col + width - badge_w - 3 

606 if col + 1 <= lead_pos < col + width - 1 and lead_pos < total_width: 

607 grid[row][lead_pos] = _bc(" ") 

608 # Badge characters (shifted left by 1 compared to no-padding placement) 

609 pos = col + width - 1 - badge_w - 1 

610 for ch in badge: 

611 ch_w = _wcwidth(ch) 

612 if ch_w < 1: 

613 ch_w = 1 

614 if col + 1 <= pos < col + width - 1 and pos < total_width: 

615 grid[row][pos] = _bc(ch) 

616 if ch_w == 2 and pos + 1 < col + width - 1 and pos + 1 < total_width: 

617 grid[row][pos + 1] = "" 

618 pos += ch_w 

619 

620 # Content rows 

621 for i, line in enumerate(content): 

622 r = row + i + 1 

623 if r >= len(grid): 

624 break 

625 if col < total_width: 

626 grid[r][col] = _bc("\u2502") 

627 if col + width - 1 < total_width: 

628 grid[r][col + width - 1] = _bc("\u2502") 

629 if is_highlighted and i == 0: 

630 colored_line = colorize(line, f"{highlight_color};1") 

631 if col + 2 < total_width: 

632 grid[r][col + 2] = colored_line 

633 for j in range(1, len(line)): 

634 if col + 2 + j < col + width - 1: 

635 grid[r][col + 2 + j] = "" 

636 elif i == 0: 

637 bold_line = colorize(line, "1") 

638 if col + 2 < total_width: 

639 grid[r][col + 2] = bold_line 

640 for j in range(1, len(line)): 

641 if col + 2 + j < col + width - 1: 

642 grid[r][col + 2 + j] = "" 

643 else: 

644 for j, ch in enumerate(line): 

645 if col + 2 + j < col + width - 1: 

646 grid[r][col + 2 + j] = ch 

647 

648 # Padding rows between content and bottom border 

649 for i in range(len(content) + 1, height - 1): 

650 r = row + i 

651 if r >= len(grid): 

652 break 

653 if col < total_width: 

654 grid[r][col] = _bc("\u2502") 

655 if col + width - 1 < total_width: 

656 grid[r][col + width - 1] = _bc("\u2502") 

657 

658 # Bottom border 

659 brow = row + height - 1 

660 if brow < len(grid): 

661 if col < total_width: 

662 grid[brow][col] = _bc("\u2514") 

663 for j in range(1, width - 1): 

664 if col + j < total_width: 

665 grid[brow][col + j] = _bc("\u2500") 

666 if col + width - 1 < total_width: 

667 grid[brow][col + width - 1] = _bc("\u2518") 

668 

669 

670# --------------------------------------------------------------------------- 

671# Layered (vertical) renderer 

672# --------------------------------------------------------------------------- 

673 

674 

675def _render_layered_diagram( 

676 layers: list[list[str]], 

677 edges: list[tuple[str, str, str]], 

678 main_edge_set: set[tuple[str, str]], 

679 branches: list[tuple[str, str, str]], 

680 back_edges: list[tuple[str, str, str]], 

681 initial: str, 

682 terminal_states: set[str] | None, 

683 fsm_states: dict[str, StateConfig] | None, 

684 verbose: bool, 

685 highlight_state: str | None, 

686 highlight_color: str, 

687 edge_label_colors: dict[str, str] | None = None, 

688 badges: dict[str, str] | None = None, 

689) -> str: 

690 """Render FSM using layered (Sugiyama-style) vertical layout.""" 

691 terminal_states = terminal_states or set() 

692 fsm_states = fsm_states or {} 

693 tw = terminal_width() 

694 

695 # Flatten layers to get all states 

696 all_states = [s for layer in layers for s in layer] 

697 if not all_states: 

698 return "" 

699 

700 display_label = _compute_display_labels(all_states, initial, terminal_states) 

701 

702 # Compute max_box_inner based on widest layer 

703 max_layer_size = max(len(layer) for layer in layers) 

704 if verbose: 

705 max_box_inner = max(20, min(60, (tw - 4) // max(1, max_layer_size) - 6)) 

706 else: 

707 max_box_inner = max(20, min(40, (tw - 4) // max(1, max_layer_size) - 6)) 

708 

709 box_inner, box_width, box_height, box_badge = _compute_box_sizes( 

710 all_states, display_label, fsm_states, verbose, max_box_inner, badges 

711 ) 

712 

713 # Post-hoc layer merge: re-merge adjacent single-state layers that were 

714 # over-split by the conservative max_width_per_layer estimate. Only merge 

715 # when both layers receive an edge from the same source state (indicating 

716 # they were originally one layer split by width constraint). 

717 available_w = tw - 20 # conservative content-area estimate 

718 gap_between = 6 

719 # Build edge target sets: for each state, which earlier states point to it 

720 _incoming: dict[str, set[str]] = {s: set() for layer in layers for s in layer} 

721 for src, dst, _ in edges: 

722 if src != dst and dst in _incoming: 

723 _incoming[dst].add(src) 

724 merged = True 

725 while merged: 

726 merged = False 

727 i = 0 

728 while i < len(layers) - 1: 

729 la, lb = layers[i], layers[i + 1] 

730 # Only merge single-state layers that share an incoming source 

731 if len(la) == 1 and len(lb) == 1: 

732 sources_a = _incoming.get(la[0], set()) 

733 sources_b = _incoming.get(lb[0], set()) 

734 shared_source = sources_a & sources_b 

735 else: 

736 shared_source = set() 

737 combined_w = ( 

738 sum(box_width[s] for s in la) 

739 + gap_between * (len(la) - 1) 

740 + gap_between 

741 + sum(box_width[s] for s in lb) 

742 + gap_between * (len(lb) - 1) 

743 ) 

744 if shared_source and combined_w <= available_w and len(la) + len(lb) <= 4: 

745 layers[i] = la + lb 

746 layers.pop(i + 1) 

747 merged = True 

748 else: 

749 i += 1 

750 

751 # Collect ALL non-self-loop forward edge labels (main + branches + same-depth back-edges) 

752 # Multiple edges between the same pair are combined as "label1/label2" 

753 forward_edge_labels: dict[tuple[str, str], str] = {} 

754 for src, dst, lbl in edges: 

755 if src == dst: 

756 continue 

757 if (src, dst) in main_edge_set or (src, dst, lbl) in branches: 

758 if (src, dst) in forward_edge_labels: 

759 forward_edge_labels[(src, dst)] += "/" + lbl 

760 else: 

761 forward_edge_labels[(src, dst)] = lbl 

762 

763 # True back-edges: only those going to an earlier layer (computed after layer assignment) 

764 # Will be finalized below after col positions are computed 

765 # Combine same-pair back-edges into single entries with merged labels (e.g. "error/fail") 

766 back_edge_labels_initial: dict[tuple[str, str], str] = {} 

767 for s, d, lbl in back_edges: 

768 if s != d: 

769 if (s, d) in back_edge_labels_initial: 

770 back_edge_labels_initial[(s, d)] += "/" + lbl 

771 else: 

772 back_edge_labels_initial[(s, d)] = lbl 

773 

774 # Pre-compute layer positions to detect main-path cycle edges early. 

775 # This ensures back_edge_margin accounts for ALL backward-pointing edges 

776 # (including main-path cycles like commit → initial_state) before column 

777 # positions are computed. 

778 prelim_layer_of: dict[str, int] = {} 

779 for li, layer in enumerate(layers): 

780 for s in layer: 

781 prelim_layer_of[s] = li 

782 

783 # Include main-path/branch edges that point backward in margin estimate 

784 all_back_labels: dict[tuple[str, str], str] = dict(back_edge_labels_initial) 

785 for (src, dst), lbl in forward_edge_labels.items(): 

786 src_layer = prelim_layer_of.get(src, -1) 

787 dst_layer = prelim_layer_of.get(dst, -1) 

788 if dst_layer < src_layer: 

789 if (src, dst) in all_back_labels: 

790 all_back_labels[(src, dst)] += "/" + lbl 

791 else: 

792 all_back_labels[(src, dst)] = lbl 

793 

794 non_self_back_initial = [(s, d, lbl) for (s, d), lbl in all_back_labels.items()] 

795 back_edge_margin = 0 

796 if non_self_back_initial: 

797 max_label_len = max(len(lbl) for _, _, lbl in non_self_back_initial) 

798 n_back_initial = len(non_self_back_initial) 

799 back_edge_margin = max_label_len + max(6, 2 * n_back_initial + 2) 

800 

801 content_left = 2 + back_edge_margin 

802 

803 # Self-loops per state 

804 self_loops: dict[str, list[str]] = {} 

805 for src, dst, lbl in back_edges: 

806 if src == dst: 

807 self_loops.setdefault(src, []).append(lbl) 

808 

809 # --- Compute a common center column for alignment --- 

810 # For layers with single boxes, we want vertical alignment through a 

811 # shared center column. Use the widest single-state layer's center. 

812 max_single_w = max((box_width[layer[0]] for layer in layers if len(layer) == 1), default=0) 

813 # The common center is at content_left + max_single_w // 2 

814 common_center = content_left + max_single_w // 2 

815 

816 # Compute column positions per layer 

817 col_start: dict[str, int] = {} 

818 col_center: dict[str, int] = {} 

819 layer_of: dict[str, int] = {} 

820 

821 for li, layer in enumerate(layers): 

822 if len(layer) == 1: 

823 # Single-state layer: center-align to common center 

824 sname = layer[0] 

825 col_start[sname] = common_center - box_width[sname] // 2 

826 col_center[sname] = common_center 

827 layer_of[sname] = li 

828 else: 

829 # Multi-state layer: place side-by-side, centered around common_center 

830 gap_between = 6 

831 total_layer_w = sum(box_width[s] for s in layer) + gap_between * (len(layer) - 1) 

832 x = common_center - total_layer_w // 2 

833 x = max(content_left, x) 

834 for i, sname in enumerate(layer): 

835 col_start[sname] = x 

836 col_center[sname] = x + box_width[sname] // 2 

837 layer_of[sname] = li 

838 if i < len(layer) - 1: 

839 next_s = layer[i + 1] 

840 # Check for edge labels in both directions between adjacent states 

841 label_fwd = forward_edge_labels.get((sname, next_s), "") 

842 label_rev = forward_edge_labels.get((next_s, sname), "") 

843 max_label = max(len(label_fwd), len(label_rev)) 

844 gap = max(gap_between, max_label + 6) if max_label > 0 else gap_between 

845 x += box_width[sname] + gap 

846 else: 

847 x += box_width[sname] 

848 

849 # Reclassify back-edges based on actual layer positions 

850 # Only edges going to an earlier layer are true margin back-edges 

851 # Combine same-pair edges into merged labels (e.g. "error/fail") 

852 back_edge_labels_reclass: dict[tuple[str, str], str] = {} 

853 same_layer_edges: list[tuple[str, str, str]] = [] 

854 for src, dst, lbl in back_edges: 

855 if src == dst: 

856 continue 

857 src_layer = layer_of.get(src, -1) 

858 dst_layer = layer_of.get(dst, -1) 

859 if dst_layer < src_layer: 

860 if (src, dst) in back_edge_labels_reclass: 

861 back_edge_labels_reclass[(src, dst)] += "/" + lbl 

862 else: 

863 back_edge_labels_reclass[(src, dst)] = lbl 

864 elif dst_layer == src_layer: 

865 same_layer_edges.append((src, dst, lbl)) 

866 else: # dst_layer > src_layer: actually forward edge 

867 if (src, dst) in forward_edge_labels: 

868 forward_edge_labels[(src, dst)] += "/" + lbl 

869 else: 

870 forward_edge_labels[(src, dst)] = lbl 

871 

872 # Also reclassify main/branch edges in forward_edge_labels that point backward 

873 # after layer assignment (e.g. main-path cycle edges like commit → initial_state) 

874 backward_in_fwd: list[tuple[str, str]] = [] 

875 for (src, dst), lbl in forward_edge_labels.items(): 

876 src_layer = layer_of.get(src, -1) 

877 dst_layer = layer_of.get(dst, -1) 

878 if dst_layer < src_layer: 

879 backward_in_fwd.append((src, dst)) 

880 if (src, dst) in back_edge_labels_reclass: 

881 back_edge_labels_reclass[(src, dst)] += "/" + lbl 

882 else: 

883 back_edge_labels_reclass[(src, dst)] = lbl 

884 elif dst_layer == src_layer and src != dst: 

885 backward_in_fwd.append((src, dst)) 

886 same_layer_edges.append((src, dst, lbl)) 

887 for key in backward_in_fwd: 

888 del forward_edge_labels[key] 

889 

890 # Add same-layer back-edges to forward_edge_labels so gap calculation accounts for them 

891 for src, dst, lbl in same_layer_edges: 

892 if (src, dst) in forward_edge_labels: 

893 forward_edge_labels[(src, dst)] += "/" + lbl 

894 else: 

895 forward_edge_labels[(src, dst)] = lbl 

896 

897 # Recalculate inter-box gaps for layers with newly discovered same-layer edges 

898 affected_layers: set[int] = set() 

899 for src, dst, _lbl in same_layer_edges: 

900 sl = layer_of.get(src, -1) 

901 dl = layer_of.get(dst, -1) 

902 if sl >= 0: 

903 affected_layers.add(sl) 

904 if dl >= 0: 

905 affected_layers.add(dl) 

906 for li in affected_layers: 

907 layer = layers[li] 

908 if len(layer) < 2: 

909 continue 

910 gap_between = 6 

911 total_layer_w = sum(box_width[s] for s in layer) 

912 # For non-adjacent same-layer edges the label lands in the gap immediately 

913 # adjacent to the source box (left of src for right-to-left, right of src 

914 # for left-to-right). Collect those requirements so the gap is wide enough. 

915 extra_gap_req: dict[tuple[str, str], int] = {} 

916 for src, dst, lbl in same_layer_edges: 

917 if layer_of.get(src) != li or layer_of.get(dst) != li: 

918 continue 

919 try: 

920 si, di = layer.index(src), layer.index(dst) 

921 except ValueError: 

922 continue 

923 if abs(si - di) <= 1: 

924 continue # adjacent — already handled by forward_edge_labels 

925 if si > di: 

926 key = (layer[si - 1], src) # gap to the left of src 

927 else: 

928 key = (src, layer[si + 1]) # gap to the right of src 

929 extra_gap_req[key] = max(extra_gap_req.get(key, 0), len(lbl)) 

930 # Recalculate gaps with label-aware spacing 

931 gaps: list[int] = [] 

932 for i in range(len(layer) - 1): 

933 sname, next_s = layer[i], layer[i + 1] 

934 label_fwd = forward_edge_labels.get((sname, next_s), "") 

935 label_rev = forward_edge_labels.get((next_s, sname), "") 

936 max_label = max(len(label_fwd), len(label_rev), extra_gap_req.get((sname, next_s), 0)) 

937 gap = max(gap_between, max_label + 6) if max_label > 0 else gap_between 

938 gaps.append(gap) 

939 total_layer_w += sum(gaps) 

940 x = common_center - total_layer_w // 2 

941 x = max(content_left, x) 

942 for i, sname in enumerate(layer): 

943 col_start[sname] = x 

944 col_center[sname] = x + box_width[sname] // 2 

945 if i < len(layer) - 1: 

946 x += box_width[sname] + gaps[i] 

947 else: 

948 x += box_width[sname] 

949 

950 non_self_back = [(s, d, lbl) for (s, d), lbl in back_edge_labels_reclass.items()] 

951 

952 # Recalculate back-edge margin if it changed 

953 if non_self_back: 

954 max_label_len = max(len(lbl) for _, _, lbl in non_self_back) 

955 n_back = len(non_self_back) 

956 actual_margin = max_label_len + max(6, 2 * n_back + 2) 

957 if actual_margin != back_edge_margin: 

958 # Need to recalculate positions (rare case - usually matches) 

959 back_edge_margin = actual_margin 

960 content_left = 2 + back_edge_margin 

961 

962 # Identify forward skip-layer edges (span > 1 layer, not handled by consecutive renderer) 

963 skip_forward_edges: list[tuple[str, str, str]] = [] 

964 for (src, dst), lbl in forward_edge_labels.items(): 

965 src_layer = layer_of.get(src, -1) 

966 dst_layer = layer_of.get(dst, -1) 

967 if dst_layer > src_layer + 1: 

968 skip_forward_edges.append((src, dst, lbl)) 

969 

970 # Pre-compute right margin width for forward skip-layer edges 

971 right_edge_margin = 0 

972 if skip_forward_edges: 

973 max_fwd_label_len = max(len(lbl) for _, _, lbl in skip_forward_edges) 

974 right_edge_margin = max_fwd_label_len + 6 

975 

976 # Compute total width needed 

977 total_content_w = 0 

978 for s in all_states: 

979 right = col_start[s] + box_width[s] 

980 total_content_w = max(total_content_w, right) 

981 total_width = max(total_content_w + right_edge_margin + 4, tw) 

982 

983 # Compute vertical positions with space for self-loops and inter-layer arrows 

984 row_start: dict[str, int] = {} 

985 y = 0 

986 for li, layer in enumerate(layers): 

987 layer_h = max(box_height[s] for s in layer) 

988 for sname in layer: 

989 row_start[sname] = y 

990 y += layer_h 

991 

992 # Add self-loop row if any state in this layer has self-loops 

993 has_self_loop = any(s in self_loops for s in layer) 

994 if has_self_loop: 

995 y += 1 # self-loop marker row 

996 

997 if li < len(layers) - 1: 

998 y += 3 # arrow gap: │, label, ▼ 

999 

1000 total_height = y 

1001 

1002 # Build character grid 

1003 grid: list[list[str]] = [[" "] * total_width for _ in range(total_height)] 

1004 

1005 # Draw boxes 

1006 for sname in all_states: 

1007 is_highlighted = highlight_state is not None and sname == highlight_state 

1008 _draw_box( 

1009 grid, 

1010 row_start[sname], 

1011 col_start[sname], 

1012 box_width[sname], 

1013 box_height[sname], 

1014 box_inner[sname], 

1015 is_highlighted, 

1016 highlight_color, 

1017 badge=box_badge[sname], 

1018 ) 

1019 

1020 # Precompute box-occupied (row, col) pairs so connector lines can avoid overwriting box cells 

1021 _box_occ: dict[int, set[int]] = {} 

1022 for _s in all_states: 

1023 for _r in range(row_start[_s], row_start[_s] + box_height[_s]): 

1024 _row_set = _box_occ.setdefault(_r, set()) 

1025 for _c in range(col_start[_s], col_start[_s] + box_width[_s]): 

1026 _row_set.add(_c) 

1027 

1028 # Draw self-loop markers immediately below their boxes 

1029 for sname, labels in self_loops.items(): 

1030 marker = "\u21ba " + ", ".join(labels) 

1031 r = row_start[sname] + box_height[sname] 

1032 if r < total_height: 

1033 cx = col_center[sname] 

1034 pos = max(0, cx - len(marker) // 2) 

1035 for j, ch in enumerate(marker): 

1036 if pos + j < total_width: 

1037 grid[r][pos + j] = ch 

1038 

1039 # Shared row tracker: prevents two labels (back-edge, skip-forward, or adjacent) 

1040 # landing on the same grid row, which would clobber the first label written there. 

1041 used_label_rows: set[int] = set() 

1042 

1043 # Draw forward edges between layers (vertical arrows with labels) 

1044 for li in range(len(layers) - 1): 

1045 layer_h = max(box_height[s] for s in layers[li]) 

1046 has_self_loop = any(s in self_loops for s in layers[li]) 

1047 self_loop_offset = 1 if has_self_loop else 0 

1048 

1049 # Arrow area starts after box bottom + self-loop row 

1050 arrow_start_row = row_start[layers[li][0]] + layer_h + self_loop_offset 

1051 arrow_end_row = row_start[layers[li + 1][0]] - 1 

1052 

1053 # Collect all inter-layer edges from this layer to the next 

1054 inter_edges: list[tuple[str, str, str]] = [] 

1055 for src in layers[li]: 

1056 for dst in layers[li + 1]: 

1057 label = forward_edge_labels.get((src, dst)) 

1058 if label is not None: 

1059 inter_edges.append((src, dst, label)) 

1060 

1061 # Draw each edge with its own vertical pipe to the target's center 

1062 for src, dst, label in inter_edges: 

1063 dst_cc = col_center[dst] 

1064 src_left = col_start[src] 

1065 src_right = src_left + box_width[src] 

1066 ec = _edge_line_color(label) # ANSI code for this edge's connector chars 

1067 

1068 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306 

1069 return colorize(ch, _ec) if _ec else ch 

1070 

1071 # Horizontal connector when pipe is outside source box range 

1072 if dst_cc >= src_right or dst_cc < src_left: 

1073 conn_row = arrow_start_row 

1074 if 0 <= conn_row < total_height: 

1075 if dst_cc >= src_right: 

1076 # Pipe right of source: └───┐ 

1077 src_cc = col_center[src] 

1078 if 0 <= src_cc < total_width and grid[conn_row][src_cc] == " ": 

1079 grid[conn_row][src_cc] = _lc("\u2514") # └ 

1080 start_c = src_cc + 1 

1081 else: 

1082 start_c = src_right 

1083 for c in range(start_c, dst_cc): 

1084 if 0 <= c < total_width: 

1085 grid[conn_row][c] = _lc("\u2500") 

1086 if 0 <= dst_cc < total_width: 

1087 grid[conn_row][dst_cc] = _lc("\u2510") # ┐ 

1088 else: 

1089 # Pipe left of source: ┌───┘ 

1090 src_cc = col_center[src] 

1091 if 0 <= src_cc < total_width and grid[conn_row][src_cc] == " ": 

1092 end_c = src_cc 

1093 grid[conn_row][src_cc] = _lc("\u2518") # ┘ 

1094 else: 

1095 end_c = src_left 

1096 for c in range(dst_cc + 1, end_c): 

1097 if 0 <= c < total_width: 

1098 grid[conn_row][c] = _lc("\u2500") 

1099 if 0 <= dst_cc < total_width: 

1100 grid[conn_row][dst_cc] = _lc("\u250c") # ┌ 

1101 pipe_start = arrow_start_row + 1 

1102 else: 

1103 pipe_start = arrow_start_row 

1104 

1105 # Draw vertical pipe at destination's center column 

1106 for r in range(pipe_start, arrow_end_row): 

1107 if 0 <= dst_cc < total_width and r < total_height: 

1108 grid[r][dst_cc] = _lc("\u2502") 

1109 

1110 # Arrow tip at destination center 

1111 if arrow_end_row < total_height and 0 <= dst_cc < total_width: 

1112 grid[arrow_end_row][dst_cc] = _lc("\u25bc") 

1113 

1114 # Label to the right of the pipe. When skip-layer forward edges 

1115 # exist their vertical pipes occupy columns starting at 

1116 # total_content_w+2, so clamp to total_content_w to avoid 

1117 # overwriting them (BUG-1500). Without skip-layer edges the right 

1118 # margin is empty and the full total_width is available. 

1119 label_row = arrow_start_row 

1120 # Nudge if this row already has a label from another inter-layer edge 

1121 # (e.g., two edges from the same source go to states in the same layer). 

1122 # Try pipe rows (pipe_start..arrow_end_row) before giving up (BUG-1501). 

1123 if label_row in used_label_rows: 

1124 for _cand in range(pipe_start, arrow_end_row + 1): 

1125 if _cand not in used_label_rows: 

1126 label_row = _cand 

1127 break 

1128 if label_row < total_height: 

1129 used_label_rows.add(label_row) 

1130 label_start = dst_cc + 2 

1131 max_col = total_content_w if skip_forward_edges else total_width 

1132 max_label = max_col - label_start 

1133 if 0 < max_label < len(label): 

1134 label = label[: max_label - 1] + "…" 

1135 for j, ch in enumerate(label): 

1136 if label_start + j < max_col: 

1137 grid[label_row][label_start + j] = ch 

1138 

1139 # Post-pass: connect horizontal gaps for multi-branch sources 

1140 if len(inter_edges) >= 2 and 0 <= arrow_start_row < total_height: 

1141 src_targets: dict[str, list[int]] = {} 

1142 for src, dst, _ in inter_edges: 

1143 if dst in col_center: 

1144 src_targets.setdefault(src, []).append(col_center[dst]) 

1145 for _src, centers in src_targets.items(): 

1146 if len(centers) < 2: 

1147 continue 

1148 left = min(centers) 

1149 right = max(centers) 

1150 for c in range(left + 1, right): 

1151 if 0 <= c < total_width: 

1152 cell = grid[arrow_start_row][c] 

1153 if cell == " ": 

1154 grid[arrow_start_row][c] = "\u2500" # ─ 

1155 elif cell == "\u2502": # │ → ┼ 

1156 grid[arrow_start_row][c] = "\u253c" 

1157 elif cell == "\u2518": # ┘ → ┴ 

1158 grid[arrow_start_row][c] = "\u2534" 

1159 elif cell == "\u2514": # └ → ┴ 

1160 grid[arrow_start_row][c] = "\u2534" 

1161 elif cell == "\u2510": # ┐ → ┬ 

1162 grid[arrow_start_row][c] = "\u252c" 

1163 elif cell == "\u250c": # ┌ → ┬ 

1164 grid[arrow_start_row][c] = "\u252c" 

1165 # Update boundary junction chars where the horizontal bar meets a pipe 

1166 if 0 <= left < total_width and grid[arrow_start_row][left] == "\u2502": # │ → ├ 

1167 grid[arrow_start_row][left] = "\u251c" 

1168 if 0 <= right < total_width and grid[arrow_start_row][right] == "\u2502": # │ → ┤ 

1169 grid[arrow_start_row][right] = "\u2524" 

1170 

1171 # Draw same-layer edges (horizontal arrows between states on same layer) 

1172 # Includes both branches and reclassified back-edges within same layer 

1173 all_same_layer: list[tuple[str, str, str]] = list(same_layer_edges) 

1174 for _li, layer in enumerate(layers): 

1175 for i, src in enumerate(layer): 

1176 for j, dst in enumerate(layer): 

1177 if i == j: 

1178 continue 

1179 label = forward_edge_labels.get((src, dst)) 

1180 if label is not None and (src, dst, label) not in all_same_layer: 

1181 all_same_layer.append((src, dst, label)) 

1182 

1183 for src, dst, label in all_same_layer: 

1184 if src not in col_start or dst not in col_start: 

1185 continue 

1186 name_row = row_start[src] + 1 

1187 src_right = col_start[src] + box_width[src] 

1188 dst_right = col_start[dst] + box_width[dst] 

1189 dst_left = col_start[dst] 

1190 src_left = col_start[src] 

1191 _row_boxes = _box_occ.get(name_row, set()) 

1192 ec = _edge_line_color(label) 

1193 

1194 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306 

1195 return colorize(ch, _ec) if _ec else ch 

1196 

1197 if dst_left >= src_right: 

1198 # Left to right horizontal arrow: src ──label──▶ dst 

1199 start = src_right 

1200 end = dst_left 

1201 edge_text = "\u2500" + label + "\u2500\u2500\u25b6" 

1202 available = end - start 

1203 if available < len(edge_text): 

1204 edge_text = edge_text[: max(1, available)] 

1205 left_dashes = max(0, available - len(edge_text)) 

1206 for k in range(left_dashes): 

1207 pos = start + k 

1208 if pos < total_width and name_row < total_height and pos not in _row_boxes: 

1209 grid[name_row][pos] = _lc("\u2500") 

1210 for k, ch in enumerate(edge_text): 

1211 pos = start + left_dashes + k 

1212 if ( 

1213 0 <= pos < end 

1214 and pos < total_width 

1215 and name_row < total_height 

1216 and pos not in _row_boxes 

1217 ): 

1218 grid[name_row][pos] = _lc(ch) 

1219 elif dst_right <= src_left: 

1220 # Right to left: dst is left of src: src → dst drawn as dst ◀──label── src 

1221 start = dst_right 

1222 end = src_left 

1223 edge_text = "\u25c4\u2500\u2500" + label + "\u2500" 

1224 available = end - start 

1225 if available < len(edge_text): 

1226 edge_text = edge_text[: max(1, available)] 

1227 for k, ch in enumerate(edge_text): 

1228 pos = start + k 

1229 if ( 

1230 0 <= pos < end 

1231 and pos < total_width 

1232 and name_row < total_height 

1233 and pos not in _row_boxes 

1234 ): 

1235 grid[name_row][pos] = _lc(ch) 

1236 for k in range(start + len(edge_text), end): 

1237 if k < total_width and name_row < total_height and k not in _row_boxes: 

1238 grid[name_row][k] = _lc("\u2500") 

1239 

1240 # Back-edges: left-margin vertical arrows with labels 

1241 if non_self_back: 

1242 sorted_back = sorted( 

1243 non_self_back, 

1244 key=lambda e: abs(row_start.get(e[0], 0) - row_start.get(e[1], 0)), 

1245 reverse=True, 

1246 ) 

1247 used_cols: list[int] = [] 

1248 # Compute rightmost pipe column so labels go right of ALL pipes 

1249 rightmost_pipe_col = 1 + (len(sorted_back) - 1) * 2 

1250 

1251 for src, dst, label in sorted_back: 

1252 # Source: name row of source box; target: name row of target box 

1253 src_row = row_start.get(src, 0) + 1 # name row, not bottom border 

1254 dst_row = row_start.get(dst, 0) + 1 # name row 

1255 

1256 # Find a free column in the margin 

1257 col = 1 

1258 for uc in sorted(used_cols): 

1259 if uc == col: 

1260 col += 2 

1261 used_cols.append(col) 

1262 

1263 if col >= content_left - 1: 

1264 continue 

1265 

1266 top_row = min(src_row, dst_row) 

1267 bot_row = max(src_row, dst_row) 

1268 ec = _edge_line_color(label) 

1269 

1270 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306 

1271 return colorize(ch, _ec) if _ec else ch 

1272 

1273 # Draw vertical line in margin (exclude corner rows handled below) 

1274 for r in range(top_row + 1, bot_row): 

1275 if 0 <= r < total_height and col < total_width: 

1276 cell = grid[r][col] 

1277 if cell == "\u2500": # ─ → ┼ (junction, leave uncolored) 

1278 grid[r][col] = "\u253c" 

1279 elif cell == " ": 

1280 grid[r][col] = _lc("\u2502") 

1281 

1282 # Horizontal connector from source box to margin 

1283 # Draw right-to-left, crossing existing pipes with junction chars 

1284 if 0 <= src_row < total_height: 

1285 src_left = col_start.get(src, col + 1) 

1286 _src_row_boxes = _box_occ.get(src_row, set()) 

1287 for c in range(col + 1, src_left): 

1288 if c < total_width and c not in _src_row_boxes: 

1289 cell = grid[src_row][c] 

1290 if cell == " ": 

1291 grid[src_row][c] = _lc("\u2500") # ─ 

1292 elif cell == "\u2502": # │ → ┼ (junction) 

1293 grid[src_row][c] = "\u253c" 

1294 elif cell == "\u2514": # └ → ┴ (junction) 

1295 grid[src_row][c] = "\u2534" 

1296 elif cell == "\u250c": # ┌ → ┬ (junction) 

1297 grid[src_row][c] = "\u252c" 

1298 elif cell == "\u251c": # ├ → ┼ (junction) 

1299 grid[src_row][c] = "\u253c" 

1300 # Leave ─, ▶, box chars unchanged 

1301 

1302 # Horizontal connector from margin to target box 

1303 # Draw right-to-left, crossing existing pipes with junction chars 

1304 dst_left = col_start.get(dst, col + 1) 

1305 if 0 <= dst_row < total_height: 

1306 _dst_row_boxes = _box_occ.get(dst_row, set()) 

1307 for c in range(col + 1, dst_left): 

1308 if c < total_width and c not in _dst_row_boxes: 

1309 cell = grid[dst_row][c] 

1310 if cell == " ": 

1311 grid[dst_row][c] = _lc("\u2500") # ─ 

1312 elif cell == "\u2502": # │ → ┼ (junction) 

1313 grid[dst_row][c] = "\u253c" 

1314 elif cell == "\u2514": # └ → ┴ (junction) 

1315 grid[dst_row][c] = "\u2534" 

1316 elif cell == "\u250c": # ┌ → ┬ (junction) 

1317 grid[dst_row][c] = "\u252c" 

1318 elif cell == "\u251c": # ├ → ┼ (junction) 

1319 grid[dst_row][c] = "\u253c" 

1320 

1321 # Corner characters at pipe-to-horizontal turn points 

1322 for row in (src_row, dst_row): 

1323 if 0 <= row < total_height and col < total_width: 

1324 existing = grid[row][col] 

1325 if row == bot_row: 

1326 # Pipe ends, turns right: └; if horizontal already crosses here: ┴ 

1327 grid[row][col] = "\u2534" if existing == "\u2500" else _lc("\u2514") 

1328 else: # row == top_row 

1329 # Pipe starts going down, turns right: ┌; if horizontal already crosses here: ┬ 

1330 grid[row][col] = "\u252c" if existing == "\u2500" else _lc("\u250c") 

1331 

1332 # Arrow tip at target: place ▶ at end of horizontal connector (entering box from left) 

1333 if 0 <= dst_row < total_height and dst_left - 1 > col and dst_left - 1 < total_width: 

1334 grid[dst_row][dst_left - 1] = _lc("\u25b6") 

1335 

1336 # Label on the margin line (right of ALL pipes, not just this one) 

1337 label_row_pos = (top_row + bot_row) // 2 

1338 # Nudge away from already-used rows to prevent clobbering earlier labels 

1339 if label_row_pos in used_label_rows and top_row + 1 < bot_row: 

1340 midpoint = label_row_pos 

1341 found = False 

1342 for _off in range(1, bot_row - top_row): 

1343 for _cand in (midpoint - _off, midpoint + _off): 

1344 if top_row < _cand < bot_row and _cand not in used_label_rows: 

1345 label_row_pos = _cand 

1346 found = True 

1347 break 

1348 if found: 

1349 break 

1350 if not found: 

1351 label_row_pos = top_row + 1 

1352 used_label_rows.add(label_row_pos) 

1353 if 0 <= label_row_pos < total_height: 

1354 label_start = rightmost_pipe_col + 2 

1355 for j, ch in enumerate(label): 

1356 if label_start + j < content_left - 1 and label_start + j < total_width: 

1357 grid[label_row_pos][label_start + j] = _lc(ch) 

1358 

1359 # Forward skip-layer edges: right-margin vertical arrows with labels 

1360 # Symmetric to the left-margin back-edge renderer above 

1361 if skip_forward_edges: 

1362 sorted_fwd_skip = sorted( 

1363 skip_forward_edges, 

1364 key=lambda e: abs(row_start.get(e[0], 0) - row_start.get(e[1], 0)), 

1365 reverse=True, 

1366 ) 

1367 used_fwd_cols: list[int] = [] 

1368 # Rightmost pipe column (furthest from content) for label placement 

1369 rightmost_fwd_pipe_col = total_content_w + 2 + (len(sorted_fwd_skip) - 1) * 2 

1370 

1371 for src, dst, label in sorted_fwd_skip: 

1372 src_row = row_start.get(src, 0) + 1 # name row 

1373 dst_row = row_start.get(dst, 0) + 1 # name row 

1374 

1375 # Allocate column in right margin (starting from content edge, going right) 

1376 col = total_content_w + 2 

1377 for uc in sorted(used_fwd_cols): 

1378 if uc == col: 

1379 col += 2 

1380 used_fwd_cols.append(col) 

1381 

1382 if col >= total_width: 

1383 continue 

1384 

1385 top_row = min(src_row, dst_row) 

1386 bot_row = max(src_row, dst_row) 

1387 ec = _edge_line_color(label) 

1388 

1389 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306 

1390 return colorize(ch, _ec) if _ec else ch 

1391 

1392 # Draw vertical line in right margin (exclude corner rows handled below) 

1393 for r in range(top_row + 1, bot_row): 

1394 if 0 <= r < total_height and col < total_width: 

1395 cell = grid[r][col] 

1396 if cell == "\u2500": # ─ → ┼ (junction) 

1397 grid[r][col] = "\u253c" 

1398 elif cell == " ": 

1399 grid[r][col] = _lc("\u2502") 

1400 

1401 # Horizontal connector from source box right side to margin 

1402 # Draw left-to-right, crossing existing pipes with junction chars 

1403 src_right = col_start.get(src, 0) + box_width.get(src, 0) 

1404 _src_row_boxes = _box_occ.get(src_row, set()) 

1405 if 0 <= src_row < total_height: 

1406 for c in range(src_right, col): 

1407 if 0 <= c < total_width and c not in _src_row_boxes: 

1408 cell = grid[src_row][c] 

1409 if cell == " ": 

1410 grid[src_row][c] = _lc("\u2500") # ─ 

1411 elif cell == "\u2502": # │ → ┼ (junction) 

1412 grid[src_row][c] = "\u253c" 

1413 elif cell == "\u2518": # ┘ → ┴ (junction) 

1414 grid[src_row][c] = "\u2534" 

1415 elif cell == "\u2510": # ┐ → ┬ (junction) 

1416 grid[src_row][c] = "\u252c" 

1417 elif cell == "\u2524": # ┤ → ┼ (junction) 

1418 grid[src_row][c] = "\u253c" 

1419 # Leave ─, ◀, box chars unchanged 

1420 

1421 # Horizontal connector from margin to destination box right side 

1422 dst_right = col_start.get(dst, 0) + box_width.get(dst, 0) 

1423 _dst_row_boxes = _box_occ.get(dst_row, set()) 

1424 if 0 <= dst_row < total_height: 

1425 for c in range(dst_right, col): 

1426 if 0 <= c < total_width and c not in _dst_row_boxes: 

1427 cell = grid[dst_row][c] 

1428 if cell == " ": 

1429 grid[dst_row][c] = _lc("\u2500") # ─ 

1430 elif cell == "\u2502": # │ → ┼ (junction) 

1431 grid[dst_row][c] = "\u253c" 

1432 elif cell == "\u2518": # ┘ → ┴ (junction) 

1433 grid[dst_row][c] = "\u2534" 

1434 elif cell == "\u2510": # ┐ → ┬ (junction) 

1435 grid[dst_row][c] = "\u252c" 

1436 elif cell == "\u2524": # ┤ → ┼ (junction) 

1437 grid[dst_row][c] = "\u253c" 

1438 

1439 # Corner characters at pipe-to-horizontal turn points 

1440 for row in (src_row, dst_row): 

1441 if 0 <= row < total_height and col < total_width: 

1442 existing = grid[row][col] 

1443 if row == bot_row: 

1444 # Pipe ends, turns left: ┘; if horizontal crosses: ┤ 

1445 grid[row][col] = "\u2524" if existing == "\u2500" else _lc("\u2518") 

1446 else: # row == top_row 

1447 # Pipe starts going down, turns left: ┐; if horizontal crosses: ┤ 

1448 grid[row][col] = "\u2524" if existing == "\u2500" else _lc("\u2510") 

1449 

1450 # Arrow tip at target: ◀ entering box from right side 

1451 if 0 <= dst_row < total_height and dst_right < col and dst_right < total_width: 

1452 grid[dst_row][dst_right] = _lc("\u25c0") 

1453 

1454 # Label on the margin line (right of ALL pipes, mirroring left-margin 

1455 # approach). Truncate with … when label would exceed total_width 

1456 # to prevent extending diagram lines far past the content area (BUG-1500). 

1457 label_row_pos = (top_row + bot_row) // 2 

1458 # Nudge to avoid row collision with a previously-placed label (back- or fwd-edge) 

1459 if label_row_pos in used_label_rows and top_row + 1 < bot_row: 

1460 midpoint = label_row_pos 

1461 found = False 

1462 for _off in range(1, bot_row - top_row): 

1463 for _cand in (midpoint - _off, midpoint + _off): 

1464 if top_row < _cand < bot_row and _cand not in used_label_rows: 

1465 label_row_pos = _cand 

1466 found = True 

1467 break 

1468 if found: 

1469 break 

1470 if not found: 

1471 label_row_pos = top_row + 1 

1472 used_label_rows.add(label_row_pos) 

1473 if 0 <= label_row_pos < total_height: 

1474 label_start = rightmost_fwd_pipe_col + 2 

1475 max_label = total_width - label_start 

1476 if 0 < max_label < len(label): 

1477 label = label[: max_label - 1] + "…" 

1478 for j, ch in enumerate(label): 

1479 if label_start + j < total_width: 

1480 grid[label_row_pos][label_start + j] = _lc(ch) 

1481 

1482 # Convert grid to string 

1483 lines = ["".join(row).rstrip() for row in grid] 

1484 

1485 # Remove trailing empty lines 

1486 while lines and not lines[-1].strip(): 

1487 lines.pop() 

1488 

1489 # Center diagram 

1490 max_line_len = max((len(_ANSI_ESCAPE_RE.sub("", ln)) for ln in lines), default=0) 

1491 diagram_indent = max(0, (tw - max_line_len) // 2) 

1492 if diagram_indent > 0: 

1493 lines = [" " * diagram_indent + ln if ln.strip() else ln for ln in lines] 

1494 

1495 return _colorize_diagram_labels("\n".join(lines), edge_label_colors) 

1496 

1497 

1498# --------------------------------------------------------------------------- 

1499# FSM diagram renderer (main entry point) 

1500# --------------------------------------------------------------------------- 

1501 

1502 

1503def _render_fsm_diagram( 

1504 fsm: FSMLoop, 

1505 verbose: bool = False, 

1506 highlight_state: str | None = None, 

1507 highlight_color: str = "32", 

1508 edge_label_colors: dict[str, str] | None = None, 

1509 badges: dict[str, str] | None = None, 

1510) -> str: 

1511 """Render an adaptive text diagram of the FSM graph. 

1512 

1513 Detects FSM topology and selects appropriate layout: 

1514 - Linear chains: vertical top-to-bottom 

1515 - Branching/cyclic: layered Sugiyama-style 

1516 

1517 Args: 

1518 fsm: The FSM loop to render. 

1519 verbose: If True, show expanded action content in boxes. 

1520 highlight_state: If provided, render this state's box with the highlight color. 

1521 highlight_color: ANSI SGR code for the highlighted state (default: green). 

1522 edge_label_colors: Optional label→SGR-code mapping for transition labels. 

1523 Falls back to hardcoded defaults when None. 

1524 badges: Optional glyph-key→string mapping for state type badges. 

1525 Falls back to hardcoded defaults when None. 

1526 """ 

1527 edges = _collect_edges(fsm) 

1528 bfs_order_list, bfs_depth = _bfs_order(fsm.initial, edges) 

1529 main_path, main_edge_set = _trace_main_path(fsm, edges) 

1530 branches, back_edges = _classify_edges(edges, main_edge_set, bfs_order_list) 

1531 

1532 terminal_states = {name for name, state in fsm.states.items() if state.terminal} 

1533 

1534 # Collect all states 

1535 all_states = list(main_path) 

1536 for src, dst, _ in branches: 

1537 for s in (src, dst): 

1538 if s not in all_states: 

1539 all_states.append(s) 

1540 

1541 # Topology detection 

1542 detector = TopologyDetector(edges, main_path, branches, back_edges) 

1543 topology = detector.classify() 

1544 

1545 # Build back-edge set for layout pipeline 

1546 back_edge_set: set[tuple[str, str]] = set() 

1547 for src, dst, _ in back_edges: 

1548 if src != dst: 

1549 back_edge_set.add((src, dst)) 

1550 

1551 tw = terminal_width() 

1552 

1553 if topology == "linear" and len(all_states) <= 1: 

1554 # Single state or empty — use simple horizontal 

1555 return _render_horizontal_simple( 

1556 main_path, 

1557 edges, 

1558 main_edge_set, 

1559 branches, 

1560 back_edges, 

1561 bfs_order_list, 

1562 fsm.initial, 

1563 terminal_states, 

1564 fsm.states, 

1565 verbose, 

1566 highlight_state, 

1567 highlight_color, 

1568 edge_label_colors, 

1569 badges, 

1570 ) 

1571 

1572 # Compute max node width to determine width constraint 

1573 # Quick estimate: widest state name or badge + padding 

1574 max_node_w = 30 # reasonable default 

1575 for s in all_states: 

1576 st = fsm.states.get(s) 

1577 badge = _get_state_badge(st, badges) 

1578 badge_w = _badge_display_width(badge) if badge else 0 

1579 label = s 

1580 if s == fsm.initial: 

1581 label = "\u2192 " + label 

1582 if s in terminal_states: 

1583 label = label + " \u25c9" 

1584 w = max(len(label), badge_w) 

1585 max_node_w = max(max_node_w, w + 4 + 4) # inner + borders + padding 

1586 

1587 max_width_per_layer = max(1, (tw - 10) // (max_node_w + 4)) 

1588 

1589 # Layer assignment 

1590 assigner = LayerAssigner(all_states, edges, back_edge_set, fsm.initial, max_width_per_layer) 

1591 layers = assigner.assign() 

1592 

1593 # Crossing minimization 

1594 minimizer = CrossingMinimizer(layers, edges, back_edge_set) 

1595 layers = minimizer.minimize() 

1596 

1597 return _render_layered_diagram( 

1598 layers, 

1599 edges, 

1600 main_edge_set, 

1601 branches, 

1602 back_edges, 

1603 fsm.initial, 

1604 terminal_states, 

1605 fsm.states, 

1606 verbose, 

1607 highlight_state, 

1608 highlight_color, 

1609 edge_label_colors, 

1610 badges, 

1611 ) 

1612 

1613 

1614def _render_horizontal_simple( 

1615 main_path: list[str], 

1616 edges: list[tuple[str, str, str]], 

1617 main_edge_set: set[tuple[str, str]], 

1618 branches: list[tuple[str, str, str]], 

1619 back_edges: list[tuple[str, str, str]], 

1620 bfs_order: list[str], 

1621 initial: str, 

1622 terminal_states: set[str], 

1623 fsm_states: dict[str, StateConfig], 

1624 verbose: bool, 

1625 highlight_state: str | None, 

1626 highlight_color: str, 

1627 edge_label_colors: dict[str, str] | None = None, 

1628 badges: dict[str, str] | None = None, 

1629) -> str: 

1630 """Simple horizontal rendering for single-state or very simple FSMs.""" 

1631 if not main_path: 

1632 return "" 

1633 

1634 all_states = list(main_path) 

1635 display_label = _compute_display_labels(all_states, initial, terminal_states) 

1636 

1637 tw = terminal_width() 

1638 num_main = max(1, len(main_path)) 

1639 if verbose and fsm_states and main_path: 

1640 max_box_inner = max(20, min(60, (tw - 4) // num_main - 6)) 

1641 else: 

1642 max_box_inner = max(20, min(40, (tw - 4) // num_main - 6)) 

1643 

1644 box_inner, box_width, box_height, box_badge = _compute_box_sizes( 

1645 all_states, display_label, fsm_states, verbose, max_box_inner, badges 

1646 ) 

1647 

1648 main_height = max((box_height[s] for s in main_path), default=3) 

1649 total_width = tw 

1650 

1651 # Column positions 

1652 col_start: dict[str, int] = {} 

1653 col_center: dict[str, int] = {} 

1654 x = 2 

1655 for i, sname in enumerate(main_path): 

1656 col_start[sname] = x 

1657 col_center[sname] = x + box_width[sname] // 2 

1658 x += box_width[sname] 

1659 if i < len(main_path) - 1: 

1660 x += 4 

1661 

1662 rows: list[list[str]] = [[" "] * total_width for _ in range(main_height)] 

1663 

1664 for sname in main_path: 

1665 is_highlighted = highlight_state is not None and sname == highlight_state 

1666 _draw_box( 

1667 rows, 

1668 0, 

1669 col_start[sname], 

1670 box_width[sname], 

1671 main_height, 

1672 box_inner[sname], 

1673 is_highlighted, 

1674 highlight_color, 

1675 badge=box_badge[sname], 

1676 ) 

1677 

1678 # Self-loops 

1679 self_loops_list = [(s, d, lbl) for s, d, lbl in back_edges if s == d] 

1680 lines = ["".join(row).rstrip() for row in rows] 

1681 if self_loops_list: 

1682 self_labels: dict[str, list[str]] = {} 

1683 for src, _, label in self_loops_list: 

1684 self_labels.setdefault(src, []).append(label) 

1685 for sname, labels in self_labels.items(): 

1686 marker = "\u21ba " + ", ".join(labels) 

1687 self_row = [" "] * total_width 

1688 cx = col_center.get(sname, 0) 

1689 pos = max(0, cx - len(marker) // 2) 

1690 for j, ch in enumerate(marker): 

1691 if pos + j < total_width: 

1692 self_row[pos + j] = ch 

1693 lines.append("".join(self_row).rstrip()) 

1694 

1695 diagram_indent = max(0, (tw - (x + 4)) // 2) 

1696 if diagram_indent > 0: 

1697 lines = [" " * diagram_indent + ln if ln.strip() else ln for ln in lines] 

1698 

1699 return _colorize_diagram_labels("\n".join(lines), edge_label_colors)