Coverage for little_loops / cli / loop / layout.py: 92%
1030 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""FSM diagram layout engine.
3Implements adaptive layout for FSM diagrams using a Sugiyama-inspired
4layered graph drawing approach. Supports topology detection, vertical
5linear chains, side-by-side branching, and margin back-edge rendering.
7Extracted from info.py and extended with adaptive layout capabilities.
8"""
10from __future__ import annotations
12import re
13from collections import deque
15from wcwidth import wcswidth as _wcswidth
16from wcwidth import wcwidth as _wcwidth
18from little_loops.cli.output import colorize, terminal_width
19from little_loops.fsm.schema import FSMLoop, StateConfig
21_ANSI_ESCAPE_RE = re.compile(r"\033\[[0-9;]*m")
23# ---------------------------------------------------------------------------
24# Edge label colorization
25# ---------------------------------------------------------------------------
27_EDGE_LABEL_COLORS: dict[str, str] = {
28 "yes": "32",
29 "no": "38;5;208",
30 "error": "31",
31 "partial": "33",
32 "next": "2",
33 "_": "2",
34 "blocked": "31",
35 "retry_exhausted": "38;5;208",
36 "rate_limit_exhausted": "38;5;214",
37 "throttle_hard": "38;5;196",
38}
41def _colorize_label(label: str) -> str:
42 """Colorize a (possibly compound) edge label like 'no/error'."""
43 parts = label.split("/")
44 code = ""
45 for part in parts:
46 if part in ("no", "error"):
47 code = _EDGE_LABEL_COLORS["no"]
48 break
49 if part == "partial" and not code:
50 code = _EDGE_LABEL_COLORS["partial"]
51 elif part == "yes" and not code:
52 code = _EDGE_LABEL_COLORS["yes"]
53 elif part in ("next", "_") and not code:
54 code = _EDGE_LABEL_COLORS["next"]
55 return colorize(label, code) if code else label
58def _edge_line_color(label: str) -> str:
59 """Return the ANSI SGR code to use for connector characters of an edge.
61 Applies the same priority cascade as ``_colorize_label`` so that line
62 characters (│, ─, ▼, ▶, corners) match the semantic color of their label.
63 Returns an empty string when no color applies (callers treat this as "no-op").
64 """
65 parts = label.split("/")
66 code = ""
67 for part in parts:
68 if part in (
69 "no",
70 "error",
71 "blocked",
72 "retry_exhausted",
73 "rate_limit_exhausted",
74 "throttle_hard",
75 ):
76 return _EDGE_LABEL_COLORS.get(part, "31")
77 if part == "partial" and not code:
78 code = _EDGE_LABEL_COLORS["partial"]
79 elif part == "yes" and not code:
80 code = _EDGE_LABEL_COLORS["yes"]
81 elif part in ("next", "_") and not code:
82 code = _EDGE_LABEL_COLORS["next"]
83 return code
86def _colorize_diagram_labels(diagram: str, colors: dict[str, str] | None = None) -> str:
87 """Apply ANSI color to known edge labels in a rendered diagram string.
89 Labels are colorized only when bounded by box-drawing or whitespace chars
90 to avoid coloring partial matches inside state names.
92 Args:
93 colors: Optional label→SGR-code mapping; falls back to _EDGE_LABEL_COLORS if None.
94 """
95 label_colors = colors if colors is not None else _EDGE_LABEL_COLORS
96 for label, code in label_colors.items():
97 colored = colorize(label, code)
98 diagram = re.sub(
99 r"(?<=[─ │▶\n])" + re.escape(label) + r"(?=[─ │▶\n])",
100 colored,
101 diagram,
102 )
103 return diagram
106# ---------------------------------------------------------------------------
107# State box badge definitions
108# ---------------------------------------------------------------------------
110_ACTION_TYPE_BADGES: dict[str, str] = {
111 "prompt": "\u2726", # ✦
112 "slash_command": "/\u2501\u25ba", # /━►
113 "shell": "\u276f_", # ❯_
114 "mcp_tool": "\u26a1", # ⚡
115}
117_SUB_LOOP_BADGE = "\u21b3\u27f3" # ↳⟳
118_ROUTE_BADGE = "\u2443" # ⑃
121def _badge_display_width(badge: str) -> int:
122 """Compute terminal display width of a badge string using wcwidth."""
123 w = _wcswidth(badge)
124 return w if w >= 0 else len(badge)
127def _get_state_badge(state: StateConfig | None, badges: dict[str, str] | None = None) -> str:
128 """Return the unicode badge string for a state, or '' if none."""
129 if state is None:
130 return ""
131 effective = {**_ACTION_TYPE_BADGES, **(badges or {})}
132 sub_loop_badge = (badges or {}).get("sub_loop", _SUB_LOOP_BADGE)
133 route_badge = (badges or {}).get("route", _ROUTE_BADGE)
134 if state.loop is not None:
135 return sub_loop_badge
136 if state.action_type:
137 return effective.get(state.action_type, f"[{state.action_type}]")
138 if state.action:
139 return effective["shell"]
140 if state.route is not None:
141 return route_badge
142 return ""
145# ---------------------------------------------------------------------------
146# Box content helpers for multi-row diagram boxes
147# ---------------------------------------------------------------------------
150def _box_inner_lines(
151 state: StateConfig | None,
152 display_label: str,
153 verbose: bool,
154 inner_width: int,
155) -> list[str]:
156 """Return interior lines for a state box (between top and bottom borders).
158 The first line is always ``display_label`` + type badge (if any).
159 Subsequent lines are action content lines. All lines fit within
160 ``inner_width`` characters (content is truncated or wrapped accordingly).
161 """
162 # Badge is now rendered in the top-right corner by _draw_box; name row is label only
163 name_line = display_label[:inner_width]
165 lines: list[str] = [name_line]
167 # Action lines
168 if state and state.action:
169 action_src = [ln.rstrip() for ln in state.action.strip().splitlines()]
170 if verbose:
171 for src in action_src:
172 if not src:
173 continue
174 # Wrap long lines to inner_width
175 while len(src) > inner_width:
176 lines.append(src[:inner_width])
177 src = src[inner_width:]
178 if src:
179 lines.append(src)
180 else:
181 # Show first non-empty line, truncated
182 first = next((ln for ln in action_src if ln), "")
183 if len(first) > inner_width:
184 first = first[: inner_width - 1] + "\u2026"
185 if first:
186 lines.append(first)
188 return lines
191# ---------------------------------------------------------------------------
192# Topology detection
193# ---------------------------------------------------------------------------
196def _collect_edges(fsm: FSMLoop) -> list[tuple[str, str, str]]:
197 """Collect all (source, target, label) edges from an FSM."""
198 edges: list[tuple[str, str, str]] = []
199 for name, state in fsm.states.items():
200 if state.on_yes:
201 edges.append((name, state.on_yes, "yes"))
202 if state.on_no:
203 edges.append((name, state.on_no, "no"))
204 if state.on_error:
205 edges.append((name, state.on_error, "error"))
206 if state.on_partial:
207 edges.append((name, state.on_partial, "partial"))
208 if state.on_blocked:
209 edges.append((name, state.on_blocked, "blocked"))
210 if state.on_retry_exhausted:
211 edges.append((name, state.on_retry_exhausted, "retry_exhausted"))
212 if state.on_rate_limit_exhausted:
213 edges.append((name, state.on_rate_limit_exhausted, "rate_limit_exhausted"))
214 if state.on_throttle_hard:
215 edges.append((name, state.on_throttle_hard, "throttle_hard"))
216 if state.next:
217 edges.append((name, state.next, "next"))
218 if state.route:
219 for verdict, target in state.route.routes.items():
220 edges.append((name, target, verdict))
221 if state.route.default:
222 edges.append((name, state.route.default, "_"))
223 for verdict, target in state.extra_routes.items():
224 edges.append((name, target, verdict))
225 return edges
228def _bfs_order(initial: str, edges: list[tuple[str, str, str]]) -> tuple[list[str], dict[str, int]]:
229 """BFS from initial state. Returns (order, depth_map)."""
230 order: list[str] = []
231 depth: dict[str, int] = {initial: 0}
232 queue: deque[str] = deque([initial])
233 while queue:
234 node = queue.popleft()
235 order.append(node)
236 for src, dst, _ in edges:
237 if src == node and dst not in depth:
238 depth[dst] = depth[node] + 1
239 queue.append(dst)
240 return order, depth
243def _trace_main_path(
244 fsm: FSMLoop, edges: list[tuple[str, str, str]]
245) -> tuple[list[str], set[tuple[str, str]]]:
246 """Trace the main (happy) path through the FSM."""
247 visited: set[str] = set()
248 main_path: list[str] = []
249 main_edge_set: set[tuple[str, str]] = set()
250 current = fsm.initial
251 while current and current not in visited:
252 visited.add(current)
253 main_path.append(current)
254 st = fsm.states.get(current)
255 if not st or st.terminal:
256 break
257 nxt: str = st.on_yes or st.next or ""
258 if not nxt and st.route:
259 nxt = next(iter(st.route.routes.values()), "") or st.route.default or ""
260 if nxt:
261 main_edge_set.add((current, nxt))
262 current = nxt
263 else:
264 break
265 return main_path, main_edge_set
268def _classify_edges(
269 edges: list[tuple[str, str, str]],
270 main_edge_set: set[tuple[str, str]],
271 bfs_order: list[str],
272) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
273 """Split non-main edges into branches and back_edges."""
274 main_consumed: set[int] = set()
275 for src, dst in main_edge_set:
276 for i, (s, d, _) in enumerate(edges):
277 if s == src and d == dst and i not in main_consumed:
278 main_consumed.add(i)
279 break
281 branches: list[tuple[str, str, str]] = []
282 back_edges: list[tuple[str, str, str]] = []
283 for i, (src, dst, label) in enumerate(edges):
284 if i in main_consumed:
285 continue
286 src_pos = bfs_order.index(src) if src in bfs_order else len(bfs_order)
287 dst_pos = bfs_order.index(dst) if dst in bfs_order else len(bfs_order)
288 if dst == src or dst_pos < src_pos:
289 back_edges.append((src, dst, label))
290 else:
291 branches.append((src, dst, label))
292 return branches, back_edges
295class TopologyDetector:
296 """Classify FSM graph topology for layout strategy selection."""
298 def __init__(
299 self,
300 edges: list[tuple[str, str, str]],
301 main_path: list[str],
302 branches: list[tuple[str, str, str]],
303 back_edges: list[tuple[str, str, str]],
304 ) -> None:
305 self._edges = edges
306 self._main_path = main_path
307 self._branches = branches
308 self._back_edges = back_edges
310 def classify(self) -> str:
311 """Return 'linear', 'tree', or 'general'.
313 Linear: main path only, no non-self branches, only self-loop back-edges.
314 Tree: branches exist but no fan-in (every non-initial state has ≤1 incoming).
315 General: everything else (full Sugiyama needed).
316 """
317 non_self_branches = [b for b in self._branches if b[0] != b[1]]
318 non_self_back = [b for b in self._back_edges if b[0] != b[1]]
320 if not non_self_branches and not non_self_back:
321 return "linear"
323 # Check for fan-in: any state with >1 incoming forward edge
324 in_count: dict[str, int] = {}
325 for _, dst, _ in self._edges:
326 # Only count forward edges (not back-edges)
327 in_count[dst] = in_count.get(dst, 0) + 1
329 if not non_self_back and all(v <= 1 for v in in_count.values()):
330 return "tree"
332 return "general"
335# ---------------------------------------------------------------------------
336# Sugiyama layout pipeline
337# ---------------------------------------------------------------------------
340class LayerAssigner:
341 """Assign nodes to layers using longest-path + width constraint."""
343 def __init__(
344 self,
345 all_states: list[str],
346 edges: list[tuple[str, str, str]],
347 back_edge_set: set[tuple[str, str]],
348 initial: str,
349 max_width: int = 4,
350 ) -> None:
351 self._all_states = all_states
352 self._edges = edges
353 self._back_edge_set = back_edge_set
354 self._initial = initial
355 self._max_width = max_width
357 def assign(self) -> list[list[str]]:
358 """Return list of layers, each a list of state names (top to bottom)."""
359 # Build adjacency (forward edges only)
360 forward: dict[str, list[str]] = {s: [] for s in self._all_states}
361 reverse: dict[str, list[str]] = {s: [] for s in self._all_states}
362 seen_edges: set[tuple[str, str]] = set()
363 for src, dst, _ in self._edges:
364 if (src, dst) in self._back_edge_set or src == dst:
365 continue
366 if src in forward and dst in forward and (src, dst) not in seen_edges:
367 forward[src].append(dst)
368 reverse[dst].append(src)
369 seen_edges.add((src, dst))
371 # Longest-path layer assignment (topological order)
372 layer_of: dict[str, int] = {}
374 # Kahn's algorithm for topological order
375 in_degree = {s: len(reverse[s]) for s in self._all_states}
376 queue: deque[str] = deque()
377 for s in self._all_states:
378 if in_degree[s] == 0:
379 queue.append(s)
381 topo_order: list[str] = []
382 while queue:
383 node = queue.popleft()
384 topo_order.append(node)
385 for dst in forward[node]:
386 in_degree[dst] -= 1
387 if in_degree[dst] == 0:
388 queue.append(dst)
390 # Handle nodes not reached by topo sort (cycles in forward graph)
391 for s in self._all_states:
392 if s not in set(topo_order):
393 topo_order.append(s)
395 # Assign layers: longest path from root
396 for node in topo_order:
397 if not reverse[node]:
398 layer_of[node] = 0
399 else:
400 layer_of[node] = max(
401 (layer_of.get(p, 0) + 1 for p in reverse[node]),
402 default=0,
403 )
405 # Ensure initial state is at layer 0
406 if self._initial in layer_of and layer_of[self._initial] != 0:
407 offset = layer_of[self._initial]
408 for s in layer_of:
409 layer_of[s] -= offset
411 # Build layers list
412 max_layer = max(layer_of.values(), default=0)
413 layers: list[list[str]] = [[] for _ in range(max_layer + 1)]
414 for s in self._all_states:
415 layer = layer_of.get(s, 0)
416 layers[layer].append(s)
418 # Width constraint: if any layer exceeds max_width, split
419 if self._max_width > 0:
420 new_layers: list[list[str]] = []
421 for grp in layers:
422 remaining = list(grp)
423 while len(remaining) > self._max_width:
424 new_layers.append(remaining[: self._max_width])
425 remaining = remaining[self._max_width :]
426 if remaining:
427 new_layers.append(remaining)
428 layers = new_layers
430 return layers
433class CrossingMinimizer:
434 """Minimize edge crossings using barycenter heuristic."""
436 def __init__(
437 self,
438 layers: list[list[str]],
439 edges: list[tuple[str, str, str]],
440 back_edge_set: set[tuple[str, str]],
441 ) -> None:
442 self._layers = layers
443 self._edges = edges
444 self._back_edge_set = back_edge_set
446 def minimize(self) -> list[list[str]]:
447 """Return reordered layers with reduced crossings."""
448 # Build position lookup
449 layer_of: dict[str, int] = {}
450 for i, layer in enumerate(self._layers):
451 for s in layer:
452 layer_of[s] = i
454 # Forward adjacency (non-back, non-self)
455 adj_down: dict[str, list[str]] = {}
456 adj_up: dict[str, list[str]] = {}
457 for src, dst, _ in self._edges:
458 if (src, dst) in self._back_edge_set or src == dst:
459 continue
460 if src in layer_of and dst in layer_of:
461 adj_down.setdefault(src, []).append(dst)
462 adj_up.setdefault(dst, []).append(src)
464 layers = [list(layer) for layer in self._layers]
466 # 3 sweeps: down, up, down
467 for sweep in range(3):
468 if sweep % 2 == 0:
469 # Top-down sweep
470 for i in range(1, len(layers)):
471 pos_above = {s: j for j, s in enumerate(layers[i - 1])}
472 bary: dict[str, float] = {}
473 for s in layers[i]:
474 parents = [p for p in adj_up.get(s, []) if p in pos_above]
475 if parents:
476 bary[s] = sum(pos_above[p] for p in parents) / len(parents)
477 else:
478 bary[s] = float(layers[i].index(s))
479 layers[i].sort(key=lambda s: bary.get(s, 0))
480 else:
481 # Bottom-up sweep
482 for i in range(len(layers) - 2, -1, -1):
483 pos_below = {s: j for j, s in enumerate(layers[i + 1])}
484 bary_up: dict[str, float] = {}
485 for s in layers[i]:
486 children = [c for c in adj_down.get(s, []) if c in pos_below]
487 if children:
488 bary_up[s] = sum(pos_below[c] for c in children) / len(children)
489 else:
490 bary_up[s] = float(layers[i].index(s))
491 layers[i].sort(key=lambda s: bary_up.get(s, 0))
493 return layers
496# ---------------------------------------------------------------------------
497# Rendering helpers
498# ---------------------------------------------------------------------------
501def _compute_display_labels(
502 all_states: list[str],
503 initial: str,
504 terminal_states: set[str],
505) -> dict[str, str]:
506 """Compute display labels with → prefix and ◉ suffix."""
507 display_label: dict[str, str] = {}
508 for s in all_states:
509 label = s
510 if s == initial:
511 label = "\u2192 " + label
512 if s in terminal_states:
513 label = label + " \u25c9"
514 display_label[s] = label
515 return display_label
518def _compute_box_sizes(
519 all_states: list[str],
520 display_label: dict[str, str],
521 fsm_states: dict[str, StateConfig] | None,
522 verbose: bool,
523 max_box_inner: int,
524 badges: dict[str, str] | None = None,
525) -> tuple[dict[str, list[str]], dict[str, int], dict[str, int], dict[str, str]]:
526 """Compute box content, widths, and heights for all states.
528 Returns (box_inner, box_width, box_height, box_badge).
529 """
530 box_inner: dict[str, list[str]] = {}
531 box_width: dict[str, int] = {}
532 box_badge: dict[str, str] = {}
534 for s in all_states:
535 state_obj = fsm_states.get(s) if fsm_states else None
537 badge = _get_state_badge(state_obj, badges)
538 badge_w = _badge_display_width(badge) if badge else 0
539 box_badge[s] = badge
541 # Width must fit: name label on content row, badge on top border (with one space
542 # of padding on each side: " badge ")
543 base_w = max(len(display_label[s]), badge_w + 2 if badge_w else 0)
545 inner_w = base_w
546 if state_obj and state_obj.action and max_box_inner > 0:
547 action_lines = state_obj.action.strip().splitlines()
548 if verbose:
549 max_action_w = max(
550 (len(ln.rstrip()) for ln in action_lines if ln.rstrip()), default=0
551 )
552 inner_w = max(base_w, min(max_action_w, max_box_inner))
553 else:
554 first_action = next((ln.rstrip() for ln in action_lines if ln.rstrip()), "")
555 inner_w = max(base_w, min(len(first_action), max_box_inner))
557 content = _box_inner_lines(state_obj, display_label[s], verbose, inner_w)
558 actual_w = max(len(ln) for ln in content)
559 inner_w = max(inner_w, actual_w)
560 box_inner[s] = content
561 box_width[s] = inner_w + 4 # "│ " + content + " │"
563 box_height: dict[str, int] = {s: len(box_inner[s]) + 2 for s in all_states}
564 return box_inner, box_width, box_height, box_badge
567def _draw_box(
568 grid: list[list[str]],
569 row: int,
570 col: int,
571 width: int,
572 height: int,
573 content: list[str],
574 is_highlighted: bool,
575 highlight_color: str,
576 badge: str = "",
577) -> None:
578 """Draw a state box onto a character grid at (row, col).
580 If *badge* is provided it is placed right-aligned in the top border row with
581 one space of padding on each side (``─ badge ┐``), colorized via ``_bc()``.
582 """
583 total_width = len(grid[0]) if grid else 0
585 def _bc(ch: str) -> str:
586 return colorize(ch, highlight_color) if is_highlighted else ch
588 # Top border: ┌ ─ ─ … ─ ┐
589 if col < total_width:
590 grid[row][col] = _bc("\u250c")
591 for j in range(1, width - 1):
592 if col + j < total_width:
593 grid[row][col + j] = _bc("\u2500")
594 if col + width - 1 < total_width:
595 grid[row][col + width - 1] = _bc("\u2510")
597 # Overlay badge in top-right corner with one space of padding on each side: " badge ┐"
598 if badge:
599 badge_w = _badge_display_width(badge)
600 # Trailing space between badge end and ┐
601 trail_pos = col + width - 2
602 if col + 1 <= trail_pos < col + width - 1 and trail_pos < total_width:
603 grid[row][trail_pos] = _bc(" ")
604 # Leading space before badge
605 lead_pos = col + width - badge_w - 3
606 if col + 1 <= lead_pos < col + width - 1 and lead_pos < total_width:
607 grid[row][lead_pos] = _bc(" ")
608 # Badge characters (shifted left by 1 compared to no-padding placement)
609 pos = col + width - 1 - badge_w - 1
610 for ch in badge:
611 ch_w = _wcwidth(ch)
612 if ch_w < 1:
613 ch_w = 1
614 if col + 1 <= pos < col + width - 1 and pos < total_width:
615 grid[row][pos] = _bc(ch)
616 if ch_w == 2 and pos + 1 < col + width - 1 and pos + 1 < total_width:
617 grid[row][pos + 1] = ""
618 pos += ch_w
620 # Content rows
621 for i, line in enumerate(content):
622 r = row + i + 1
623 if r >= len(grid):
624 break
625 if col < total_width:
626 grid[r][col] = _bc("\u2502")
627 if col + width - 1 < total_width:
628 grid[r][col + width - 1] = _bc("\u2502")
629 if is_highlighted and i == 0:
630 colored_line = colorize(line, f"{highlight_color};1")
631 if col + 2 < total_width:
632 grid[r][col + 2] = colored_line
633 for j in range(1, len(line)):
634 if col + 2 + j < col + width - 1:
635 grid[r][col + 2 + j] = ""
636 elif i == 0:
637 bold_line = colorize(line, "1")
638 if col + 2 < total_width:
639 grid[r][col + 2] = bold_line
640 for j in range(1, len(line)):
641 if col + 2 + j < col + width - 1:
642 grid[r][col + 2 + j] = ""
643 else:
644 for j, ch in enumerate(line):
645 if col + 2 + j < col + width - 1:
646 grid[r][col + 2 + j] = ch
648 # Padding rows between content and bottom border
649 for i in range(len(content) + 1, height - 1):
650 r = row + i
651 if r >= len(grid):
652 break
653 if col < total_width:
654 grid[r][col] = _bc("\u2502")
655 if col + width - 1 < total_width:
656 grid[r][col + width - 1] = _bc("\u2502")
658 # Bottom border
659 brow = row + height - 1
660 if brow < len(grid):
661 if col < total_width:
662 grid[brow][col] = _bc("\u2514")
663 for j in range(1, width - 1):
664 if col + j < total_width:
665 grid[brow][col + j] = _bc("\u2500")
666 if col + width - 1 < total_width:
667 grid[brow][col + width - 1] = _bc("\u2518")
670# ---------------------------------------------------------------------------
671# Layered (vertical) renderer
672# ---------------------------------------------------------------------------
675def _render_layered_diagram(
676 layers: list[list[str]],
677 edges: list[tuple[str, str, str]],
678 main_edge_set: set[tuple[str, str]],
679 branches: list[tuple[str, str, str]],
680 back_edges: list[tuple[str, str, str]],
681 initial: str,
682 terminal_states: set[str] | None,
683 fsm_states: dict[str, StateConfig] | None,
684 verbose: bool,
685 highlight_state: str | None,
686 highlight_color: str,
687 edge_label_colors: dict[str, str] | None = None,
688 badges: dict[str, str] | None = None,
689) -> str:
690 """Render FSM using layered (Sugiyama-style) vertical layout."""
691 terminal_states = terminal_states or set()
692 fsm_states = fsm_states or {}
693 tw = terminal_width()
695 # Flatten layers to get all states
696 all_states = [s for layer in layers for s in layer]
697 if not all_states:
698 return ""
700 display_label = _compute_display_labels(all_states, initial, terminal_states)
702 # Compute max_box_inner based on widest layer
703 max_layer_size = max(len(layer) for layer in layers)
704 if verbose:
705 max_box_inner = max(20, min(60, (tw - 4) // max(1, max_layer_size) - 6))
706 else:
707 max_box_inner = max(20, min(40, (tw - 4) // max(1, max_layer_size) - 6))
709 box_inner, box_width, box_height, box_badge = _compute_box_sizes(
710 all_states, display_label, fsm_states, verbose, max_box_inner, badges
711 )
713 # Post-hoc layer merge: re-merge adjacent single-state layers that were
714 # over-split by the conservative max_width_per_layer estimate. Only merge
715 # when both layers receive an edge from the same source state (indicating
716 # they were originally one layer split by width constraint).
717 available_w = tw - 20 # conservative content-area estimate
718 gap_between = 6
719 # Build edge target sets: for each state, which earlier states point to it
720 _incoming: dict[str, set[str]] = {s: set() for layer in layers for s in layer}
721 for src, dst, _ in edges:
722 if src != dst and dst in _incoming:
723 _incoming[dst].add(src)
724 merged = True
725 while merged:
726 merged = False
727 i = 0
728 while i < len(layers) - 1:
729 la, lb = layers[i], layers[i + 1]
730 # Only merge single-state layers that share an incoming source
731 if len(la) == 1 and len(lb) == 1:
732 sources_a = _incoming.get(la[0], set())
733 sources_b = _incoming.get(lb[0], set())
734 shared_source = sources_a & sources_b
735 else:
736 shared_source = set()
737 combined_w = (
738 sum(box_width[s] for s in la)
739 + gap_between * (len(la) - 1)
740 + gap_between
741 + sum(box_width[s] for s in lb)
742 + gap_between * (len(lb) - 1)
743 )
744 if shared_source and combined_w <= available_w and len(la) + len(lb) <= 4:
745 layers[i] = la + lb
746 layers.pop(i + 1)
747 merged = True
748 else:
749 i += 1
751 # Collect ALL non-self-loop forward edge labels (main + branches + same-depth back-edges)
752 # Multiple edges between the same pair are combined as "label1/label2"
753 forward_edge_labels: dict[tuple[str, str], str] = {}
754 for src, dst, lbl in edges:
755 if src == dst:
756 continue
757 if (src, dst) in main_edge_set or (src, dst, lbl) in branches:
758 if (src, dst) in forward_edge_labels:
759 forward_edge_labels[(src, dst)] += "/" + lbl
760 else:
761 forward_edge_labels[(src, dst)] = lbl
763 # True back-edges: only those going to an earlier layer (computed after layer assignment)
764 # Will be finalized below after col positions are computed
765 # Combine same-pair back-edges into single entries with merged labels (e.g. "error/fail")
766 back_edge_labels_initial: dict[tuple[str, str], str] = {}
767 for s, d, lbl in back_edges:
768 if s != d:
769 if (s, d) in back_edge_labels_initial:
770 back_edge_labels_initial[(s, d)] += "/" + lbl
771 else:
772 back_edge_labels_initial[(s, d)] = lbl
774 # Pre-compute layer positions to detect main-path cycle edges early.
775 # This ensures back_edge_margin accounts for ALL backward-pointing edges
776 # (including main-path cycles like commit → initial_state) before column
777 # positions are computed.
778 prelim_layer_of: dict[str, int] = {}
779 for li, layer in enumerate(layers):
780 for s in layer:
781 prelim_layer_of[s] = li
783 # Include main-path/branch edges that point backward in margin estimate
784 all_back_labels: dict[tuple[str, str], str] = dict(back_edge_labels_initial)
785 for (src, dst), lbl in forward_edge_labels.items():
786 src_layer = prelim_layer_of.get(src, -1)
787 dst_layer = prelim_layer_of.get(dst, -1)
788 if dst_layer < src_layer:
789 if (src, dst) in all_back_labels:
790 all_back_labels[(src, dst)] += "/" + lbl
791 else:
792 all_back_labels[(src, dst)] = lbl
794 non_self_back_initial = [(s, d, lbl) for (s, d), lbl in all_back_labels.items()]
795 back_edge_margin = 0
796 if non_self_back_initial:
797 max_label_len = max(len(lbl) for _, _, lbl in non_self_back_initial)
798 n_back_initial = len(non_self_back_initial)
799 back_edge_margin = max_label_len + max(6, 2 * n_back_initial + 2)
801 content_left = 2 + back_edge_margin
803 # Self-loops per state
804 self_loops: dict[str, list[str]] = {}
805 for src, dst, lbl in back_edges:
806 if src == dst:
807 self_loops.setdefault(src, []).append(lbl)
809 # --- Compute a common center column for alignment ---
810 # For layers with single boxes, we want vertical alignment through a
811 # shared center column. Use the widest single-state layer's center.
812 max_single_w = max((box_width[layer[0]] for layer in layers if len(layer) == 1), default=0)
813 # The common center is at content_left + max_single_w // 2
814 common_center = content_left + max_single_w // 2
816 # Compute column positions per layer
817 col_start: dict[str, int] = {}
818 col_center: dict[str, int] = {}
819 layer_of: dict[str, int] = {}
821 for li, layer in enumerate(layers):
822 if len(layer) == 1:
823 # Single-state layer: center-align to common center
824 sname = layer[0]
825 col_start[sname] = common_center - box_width[sname] // 2
826 col_center[sname] = common_center
827 layer_of[sname] = li
828 else:
829 # Multi-state layer: place side-by-side, centered around common_center
830 gap_between = 6
831 total_layer_w = sum(box_width[s] for s in layer) + gap_between * (len(layer) - 1)
832 x = common_center - total_layer_w // 2
833 x = max(content_left, x)
834 for i, sname in enumerate(layer):
835 col_start[sname] = x
836 col_center[sname] = x + box_width[sname] // 2
837 layer_of[sname] = li
838 if i < len(layer) - 1:
839 next_s = layer[i + 1]
840 # Check for edge labels in both directions between adjacent states
841 label_fwd = forward_edge_labels.get((sname, next_s), "")
842 label_rev = forward_edge_labels.get((next_s, sname), "")
843 max_label = max(len(label_fwd), len(label_rev))
844 gap = max(gap_between, max_label + 6) if max_label > 0 else gap_between
845 x += box_width[sname] + gap
846 else:
847 x += box_width[sname]
849 # Reclassify back-edges based on actual layer positions
850 # Only edges going to an earlier layer are true margin back-edges
851 # Combine same-pair edges into merged labels (e.g. "error/fail")
852 back_edge_labels_reclass: dict[tuple[str, str], str] = {}
853 same_layer_edges: list[tuple[str, str, str]] = []
854 for src, dst, lbl in back_edges:
855 if src == dst:
856 continue
857 src_layer = layer_of.get(src, -1)
858 dst_layer = layer_of.get(dst, -1)
859 if dst_layer < src_layer:
860 if (src, dst) in back_edge_labels_reclass:
861 back_edge_labels_reclass[(src, dst)] += "/" + lbl
862 else:
863 back_edge_labels_reclass[(src, dst)] = lbl
864 elif dst_layer == src_layer:
865 same_layer_edges.append((src, dst, lbl))
866 else: # dst_layer > src_layer: actually forward edge
867 if (src, dst) in forward_edge_labels:
868 forward_edge_labels[(src, dst)] += "/" + lbl
869 else:
870 forward_edge_labels[(src, dst)] = lbl
872 # Also reclassify main/branch edges in forward_edge_labels that point backward
873 # after layer assignment (e.g. main-path cycle edges like commit → initial_state)
874 backward_in_fwd: list[tuple[str, str]] = []
875 for (src, dst), lbl in forward_edge_labels.items():
876 src_layer = layer_of.get(src, -1)
877 dst_layer = layer_of.get(dst, -1)
878 if dst_layer < src_layer:
879 backward_in_fwd.append((src, dst))
880 if (src, dst) in back_edge_labels_reclass:
881 back_edge_labels_reclass[(src, dst)] += "/" + lbl
882 else:
883 back_edge_labels_reclass[(src, dst)] = lbl
884 elif dst_layer == src_layer and src != dst:
885 backward_in_fwd.append((src, dst))
886 same_layer_edges.append((src, dst, lbl))
887 for key in backward_in_fwd:
888 del forward_edge_labels[key]
890 # Add same-layer back-edges to forward_edge_labels so gap calculation accounts for them
891 for src, dst, lbl in same_layer_edges:
892 if (src, dst) in forward_edge_labels:
893 forward_edge_labels[(src, dst)] += "/" + lbl
894 else:
895 forward_edge_labels[(src, dst)] = lbl
897 # Recalculate inter-box gaps for layers with newly discovered same-layer edges
898 affected_layers: set[int] = set()
899 for src, dst, _lbl in same_layer_edges:
900 sl = layer_of.get(src, -1)
901 dl = layer_of.get(dst, -1)
902 if sl >= 0:
903 affected_layers.add(sl)
904 if dl >= 0:
905 affected_layers.add(dl)
906 for li in affected_layers:
907 layer = layers[li]
908 if len(layer) < 2:
909 continue
910 gap_between = 6
911 total_layer_w = sum(box_width[s] for s in layer)
912 # For non-adjacent same-layer edges the label lands in the gap immediately
913 # adjacent to the source box (left of src for right-to-left, right of src
914 # for left-to-right). Collect those requirements so the gap is wide enough.
915 extra_gap_req: dict[tuple[str, str], int] = {}
916 for src, dst, lbl in same_layer_edges:
917 if layer_of.get(src) != li or layer_of.get(dst) != li:
918 continue
919 try:
920 si, di = layer.index(src), layer.index(dst)
921 except ValueError:
922 continue
923 if abs(si - di) <= 1:
924 continue # adjacent — already handled by forward_edge_labels
925 if si > di:
926 key = (layer[si - 1], src) # gap to the left of src
927 else:
928 key = (src, layer[si + 1]) # gap to the right of src
929 extra_gap_req[key] = max(extra_gap_req.get(key, 0), len(lbl))
930 # Recalculate gaps with label-aware spacing
931 gaps: list[int] = []
932 for i in range(len(layer) - 1):
933 sname, next_s = layer[i], layer[i + 1]
934 label_fwd = forward_edge_labels.get((sname, next_s), "")
935 label_rev = forward_edge_labels.get((next_s, sname), "")
936 max_label = max(len(label_fwd), len(label_rev), extra_gap_req.get((sname, next_s), 0))
937 gap = max(gap_between, max_label + 6) if max_label > 0 else gap_between
938 gaps.append(gap)
939 total_layer_w += sum(gaps)
940 x = common_center - total_layer_w // 2
941 x = max(content_left, x)
942 for i, sname in enumerate(layer):
943 col_start[sname] = x
944 col_center[sname] = x + box_width[sname] // 2
945 if i < len(layer) - 1:
946 x += box_width[sname] + gaps[i]
947 else:
948 x += box_width[sname]
950 non_self_back = [(s, d, lbl) for (s, d), lbl in back_edge_labels_reclass.items()]
952 # Recalculate back-edge margin if it changed
953 if non_self_back:
954 max_label_len = max(len(lbl) for _, _, lbl in non_self_back)
955 n_back = len(non_self_back)
956 actual_margin = max_label_len + max(6, 2 * n_back + 2)
957 if actual_margin != back_edge_margin:
958 # Need to recalculate positions (rare case - usually matches)
959 back_edge_margin = actual_margin
960 content_left = 2 + back_edge_margin
962 # Identify forward skip-layer edges (span > 1 layer, not handled by consecutive renderer)
963 skip_forward_edges: list[tuple[str, str, str]] = []
964 for (src, dst), lbl in forward_edge_labels.items():
965 src_layer = layer_of.get(src, -1)
966 dst_layer = layer_of.get(dst, -1)
967 if dst_layer > src_layer + 1:
968 skip_forward_edges.append((src, dst, lbl))
970 # Pre-compute right margin width for forward skip-layer edges
971 right_edge_margin = 0
972 if skip_forward_edges:
973 max_fwd_label_len = max(len(lbl) for _, _, lbl in skip_forward_edges)
974 right_edge_margin = max_fwd_label_len + 6
976 # Compute total width needed
977 total_content_w = 0
978 for s in all_states:
979 right = col_start[s] + box_width[s]
980 total_content_w = max(total_content_w, right)
981 total_width = max(total_content_w + right_edge_margin + 4, tw)
983 # Compute vertical positions with space for self-loops and inter-layer arrows
984 row_start: dict[str, int] = {}
985 y = 0
986 for li, layer in enumerate(layers):
987 layer_h = max(box_height[s] for s in layer)
988 for sname in layer:
989 row_start[sname] = y
990 y += layer_h
992 # Add self-loop row if any state in this layer has self-loops
993 has_self_loop = any(s in self_loops for s in layer)
994 if has_self_loop:
995 y += 1 # self-loop marker row
997 if li < len(layers) - 1:
998 y += 3 # arrow gap: │, label, ▼
1000 total_height = y
1002 # Build character grid
1003 grid: list[list[str]] = [[" "] * total_width for _ in range(total_height)]
1005 # Draw boxes
1006 for sname in all_states:
1007 is_highlighted = highlight_state is not None and sname == highlight_state
1008 _draw_box(
1009 grid,
1010 row_start[sname],
1011 col_start[sname],
1012 box_width[sname],
1013 box_height[sname],
1014 box_inner[sname],
1015 is_highlighted,
1016 highlight_color,
1017 badge=box_badge[sname],
1018 )
1020 # Precompute box-occupied (row, col) pairs so connector lines can avoid overwriting box cells
1021 _box_occ: dict[int, set[int]] = {}
1022 for _s in all_states:
1023 for _r in range(row_start[_s], row_start[_s] + box_height[_s]):
1024 _row_set = _box_occ.setdefault(_r, set())
1025 for _c in range(col_start[_s], col_start[_s] + box_width[_s]):
1026 _row_set.add(_c)
1028 # Draw self-loop markers immediately below their boxes
1029 for sname, labels in self_loops.items():
1030 marker = "\u21ba " + ", ".join(labels)
1031 r = row_start[sname] + box_height[sname]
1032 if r < total_height:
1033 cx = col_center[sname]
1034 pos = max(0, cx - len(marker) // 2)
1035 for j, ch in enumerate(marker):
1036 if pos + j < total_width:
1037 grid[r][pos + j] = ch
1039 # Shared row tracker: prevents two labels (back-edge, skip-forward, or adjacent)
1040 # landing on the same grid row, which would clobber the first label written there.
1041 used_label_rows: set[int] = set()
1043 # Draw forward edges between layers (vertical arrows with labels)
1044 for li in range(len(layers) - 1):
1045 layer_h = max(box_height[s] for s in layers[li])
1046 has_self_loop = any(s in self_loops for s in layers[li])
1047 self_loop_offset = 1 if has_self_loop else 0
1049 # Arrow area starts after box bottom + self-loop row
1050 arrow_start_row = row_start[layers[li][0]] + layer_h + self_loop_offset
1051 arrow_end_row = row_start[layers[li + 1][0]] - 1
1053 # Collect all inter-layer edges from this layer to the next
1054 inter_edges: list[tuple[str, str, str]] = []
1055 for src in layers[li]:
1056 for dst in layers[li + 1]:
1057 label = forward_edge_labels.get((src, dst))
1058 if label is not None:
1059 inter_edges.append((src, dst, label))
1061 # Draw each edge with its own vertical pipe to the target's center
1062 for src, dst, label in inter_edges:
1063 dst_cc = col_center[dst]
1064 src_left = col_start[src]
1065 src_right = src_left + box_width[src]
1066 ec = _edge_line_color(label) # ANSI code for this edge's connector chars
1068 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306
1069 return colorize(ch, _ec) if _ec else ch
1071 # Horizontal connector when pipe is outside source box range
1072 if dst_cc >= src_right or dst_cc < src_left:
1073 conn_row = arrow_start_row
1074 if 0 <= conn_row < total_height:
1075 if dst_cc >= src_right:
1076 # Pipe right of source: └───┐
1077 src_cc = col_center[src]
1078 if 0 <= src_cc < total_width and grid[conn_row][src_cc] == " ":
1079 grid[conn_row][src_cc] = _lc("\u2514") # └
1080 start_c = src_cc + 1
1081 else:
1082 start_c = src_right
1083 for c in range(start_c, dst_cc):
1084 if 0 <= c < total_width:
1085 grid[conn_row][c] = _lc("\u2500")
1086 if 0 <= dst_cc < total_width:
1087 grid[conn_row][dst_cc] = _lc("\u2510") # ┐
1088 else:
1089 # Pipe left of source: ┌───┘
1090 src_cc = col_center[src]
1091 if 0 <= src_cc < total_width and grid[conn_row][src_cc] == " ":
1092 end_c = src_cc
1093 grid[conn_row][src_cc] = _lc("\u2518") # ┘
1094 else:
1095 end_c = src_left
1096 for c in range(dst_cc + 1, end_c):
1097 if 0 <= c < total_width:
1098 grid[conn_row][c] = _lc("\u2500")
1099 if 0 <= dst_cc < total_width:
1100 grid[conn_row][dst_cc] = _lc("\u250c") # ┌
1101 pipe_start = arrow_start_row + 1
1102 else:
1103 pipe_start = arrow_start_row
1105 # Draw vertical pipe at destination's center column
1106 for r in range(pipe_start, arrow_end_row):
1107 if 0 <= dst_cc < total_width and r < total_height:
1108 grid[r][dst_cc] = _lc("\u2502")
1110 # Arrow tip at destination center
1111 if arrow_end_row < total_height and 0 <= dst_cc < total_width:
1112 grid[arrow_end_row][dst_cc] = _lc("\u25bc")
1114 # Label to the right of the pipe. When skip-layer forward edges
1115 # exist their vertical pipes occupy columns starting at
1116 # total_content_w+2, so clamp to total_content_w to avoid
1117 # overwriting them (BUG-1500). Without skip-layer edges the right
1118 # margin is empty and the full total_width is available.
1119 label_row = arrow_start_row
1120 # Nudge if this row already has a label from another inter-layer edge
1121 # (e.g., two edges from the same source go to states in the same layer).
1122 # Try pipe rows (pipe_start..arrow_end_row) before giving up (BUG-1501).
1123 if label_row in used_label_rows:
1124 for _cand in range(pipe_start, arrow_end_row + 1):
1125 if _cand not in used_label_rows:
1126 label_row = _cand
1127 break
1128 if label_row < total_height:
1129 used_label_rows.add(label_row)
1130 label_start = dst_cc + 2
1131 max_col = total_content_w if skip_forward_edges else total_width
1132 max_label = max_col - label_start
1133 if 0 < max_label < len(label):
1134 label = label[: max_label - 1] + "…"
1135 for j, ch in enumerate(label):
1136 if label_start + j < max_col:
1137 grid[label_row][label_start + j] = ch
1139 # Post-pass: connect horizontal gaps for multi-branch sources
1140 if len(inter_edges) >= 2 and 0 <= arrow_start_row < total_height:
1141 src_targets: dict[str, list[int]] = {}
1142 for src, dst, _ in inter_edges:
1143 if dst in col_center:
1144 src_targets.setdefault(src, []).append(col_center[dst])
1145 for _src, centers in src_targets.items():
1146 if len(centers) < 2:
1147 continue
1148 left = min(centers)
1149 right = max(centers)
1150 for c in range(left + 1, right):
1151 if 0 <= c < total_width:
1152 cell = grid[arrow_start_row][c]
1153 if cell == " ":
1154 grid[arrow_start_row][c] = "\u2500" # ─
1155 elif cell == "\u2502": # │ → ┼
1156 grid[arrow_start_row][c] = "\u253c"
1157 elif cell == "\u2518": # ┘ → ┴
1158 grid[arrow_start_row][c] = "\u2534"
1159 elif cell == "\u2514": # └ → ┴
1160 grid[arrow_start_row][c] = "\u2534"
1161 elif cell == "\u2510": # ┐ → ┬
1162 grid[arrow_start_row][c] = "\u252c"
1163 elif cell == "\u250c": # ┌ → ┬
1164 grid[arrow_start_row][c] = "\u252c"
1165 # Update boundary junction chars where the horizontal bar meets a pipe
1166 if 0 <= left < total_width and grid[arrow_start_row][left] == "\u2502": # │ → ├
1167 grid[arrow_start_row][left] = "\u251c"
1168 if 0 <= right < total_width and grid[arrow_start_row][right] == "\u2502": # │ → ┤
1169 grid[arrow_start_row][right] = "\u2524"
1171 # Draw same-layer edges (horizontal arrows between states on same layer)
1172 # Includes both branches and reclassified back-edges within same layer
1173 all_same_layer: list[tuple[str, str, str]] = list(same_layer_edges)
1174 for _li, layer in enumerate(layers):
1175 for i, src in enumerate(layer):
1176 for j, dst in enumerate(layer):
1177 if i == j:
1178 continue
1179 label = forward_edge_labels.get((src, dst))
1180 if label is not None and (src, dst, label) not in all_same_layer:
1181 all_same_layer.append((src, dst, label))
1183 for src, dst, label in all_same_layer:
1184 if src not in col_start or dst not in col_start:
1185 continue
1186 name_row = row_start[src] + 1
1187 src_right = col_start[src] + box_width[src]
1188 dst_right = col_start[dst] + box_width[dst]
1189 dst_left = col_start[dst]
1190 src_left = col_start[src]
1191 _row_boxes = _box_occ.get(name_row, set())
1192 ec = _edge_line_color(label)
1194 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306
1195 return colorize(ch, _ec) if _ec else ch
1197 if dst_left >= src_right:
1198 # Left to right horizontal arrow: src ──label──▶ dst
1199 start = src_right
1200 end = dst_left
1201 edge_text = "\u2500" + label + "\u2500\u2500\u25b6"
1202 available = end - start
1203 if available < len(edge_text):
1204 edge_text = edge_text[: max(1, available)]
1205 left_dashes = max(0, available - len(edge_text))
1206 for k in range(left_dashes):
1207 pos = start + k
1208 if pos < total_width and name_row < total_height and pos not in _row_boxes:
1209 grid[name_row][pos] = _lc("\u2500")
1210 for k, ch in enumerate(edge_text):
1211 pos = start + left_dashes + k
1212 if (
1213 0 <= pos < end
1214 and pos < total_width
1215 and name_row < total_height
1216 and pos not in _row_boxes
1217 ):
1218 grid[name_row][pos] = _lc(ch)
1219 elif dst_right <= src_left:
1220 # Right to left: dst is left of src: src → dst drawn as dst ◀──label── src
1221 start = dst_right
1222 end = src_left
1223 edge_text = "\u25c4\u2500\u2500" + label + "\u2500"
1224 available = end - start
1225 if available < len(edge_text):
1226 edge_text = edge_text[: max(1, available)]
1227 for k, ch in enumerate(edge_text):
1228 pos = start + k
1229 if (
1230 0 <= pos < end
1231 and pos < total_width
1232 and name_row < total_height
1233 and pos not in _row_boxes
1234 ):
1235 grid[name_row][pos] = _lc(ch)
1236 for k in range(start + len(edge_text), end):
1237 if k < total_width and name_row < total_height and k not in _row_boxes:
1238 grid[name_row][k] = _lc("\u2500")
1240 # Back-edges: left-margin vertical arrows with labels
1241 if non_self_back:
1242 sorted_back = sorted(
1243 non_self_back,
1244 key=lambda e: abs(row_start.get(e[0], 0) - row_start.get(e[1], 0)),
1245 reverse=True,
1246 )
1247 used_cols: list[int] = []
1248 # Compute rightmost pipe column so labels go right of ALL pipes
1249 rightmost_pipe_col = 1 + (len(sorted_back) - 1) * 2
1251 for src, dst, label in sorted_back:
1252 # Source: name row of source box; target: name row of target box
1253 src_row = row_start.get(src, 0) + 1 # name row, not bottom border
1254 dst_row = row_start.get(dst, 0) + 1 # name row
1256 # Find a free column in the margin
1257 col = 1
1258 for uc in sorted(used_cols):
1259 if uc == col:
1260 col += 2
1261 used_cols.append(col)
1263 if col >= content_left - 1:
1264 continue
1266 top_row = min(src_row, dst_row)
1267 bot_row = max(src_row, dst_row)
1268 ec = _edge_line_color(label)
1270 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306
1271 return colorize(ch, _ec) if _ec else ch
1273 # Draw vertical line in margin (exclude corner rows handled below)
1274 for r in range(top_row + 1, bot_row):
1275 if 0 <= r < total_height and col < total_width:
1276 cell = grid[r][col]
1277 if cell == "\u2500": # ─ → ┼ (junction, leave uncolored)
1278 grid[r][col] = "\u253c"
1279 elif cell == " ":
1280 grid[r][col] = _lc("\u2502")
1282 # Horizontal connector from source box to margin
1283 # Draw right-to-left, crossing existing pipes with junction chars
1284 if 0 <= src_row < total_height:
1285 src_left = col_start.get(src, col + 1)
1286 _src_row_boxes = _box_occ.get(src_row, set())
1287 for c in range(col + 1, src_left):
1288 if c < total_width and c not in _src_row_boxes:
1289 cell = grid[src_row][c]
1290 if cell == " ":
1291 grid[src_row][c] = _lc("\u2500") # ─
1292 elif cell == "\u2502": # │ → ┼ (junction)
1293 grid[src_row][c] = "\u253c"
1294 elif cell == "\u2514": # └ → ┴ (junction)
1295 grid[src_row][c] = "\u2534"
1296 elif cell == "\u250c": # ┌ → ┬ (junction)
1297 grid[src_row][c] = "\u252c"
1298 elif cell == "\u251c": # ├ → ┼ (junction)
1299 grid[src_row][c] = "\u253c"
1300 # Leave ─, ▶, box chars unchanged
1302 # Horizontal connector from margin to target box
1303 # Draw right-to-left, crossing existing pipes with junction chars
1304 dst_left = col_start.get(dst, col + 1)
1305 if 0 <= dst_row < total_height:
1306 _dst_row_boxes = _box_occ.get(dst_row, set())
1307 for c in range(col + 1, dst_left):
1308 if c < total_width and c not in _dst_row_boxes:
1309 cell = grid[dst_row][c]
1310 if cell == " ":
1311 grid[dst_row][c] = _lc("\u2500") # ─
1312 elif cell == "\u2502": # │ → ┼ (junction)
1313 grid[dst_row][c] = "\u253c"
1314 elif cell == "\u2514": # └ → ┴ (junction)
1315 grid[dst_row][c] = "\u2534"
1316 elif cell == "\u250c": # ┌ → ┬ (junction)
1317 grid[dst_row][c] = "\u252c"
1318 elif cell == "\u251c": # ├ → ┼ (junction)
1319 grid[dst_row][c] = "\u253c"
1321 # Corner characters at pipe-to-horizontal turn points
1322 for row in (src_row, dst_row):
1323 if 0 <= row < total_height and col < total_width:
1324 existing = grid[row][col]
1325 if row == bot_row:
1326 # Pipe ends, turns right: └; if horizontal already crosses here: ┴
1327 grid[row][col] = "\u2534" if existing == "\u2500" else _lc("\u2514")
1328 else: # row == top_row
1329 # Pipe starts going down, turns right: ┌; if horizontal already crosses here: ┬
1330 grid[row][col] = "\u252c" if existing == "\u2500" else _lc("\u250c")
1332 # Arrow tip at target: place ▶ at end of horizontal connector (entering box from left)
1333 if 0 <= dst_row < total_height and dst_left - 1 > col and dst_left - 1 < total_width:
1334 grid[dst_row][dst_left - 1] = _lc("\u25b6")
1336 # Label on the margin line (right of ALL pipes, not just this one)
1337 label_row_pos = (top_row + bot_row) // 2
1338 # Nudge away from already-used rows to prevent clobbering earlier labels
1339 if label_row_pos in used_label_rows and top_row + 1 < bot_row:
1340 midpoint = label_row_pos
1341 found = False
1342 for _off in range(1, bot_row - top_row):
1343 for _cand in (midpoint - _off, midpoint + _off):
1344 if top_row < _cand < bot_row and _cand not in used_label_rows:
1345 label_row_pos = _cand
1346 found = True
1347 break
1348 if found:
1349 break
1350 if not found:
1351 label_row_pos = top_row + 1
1352 used_label_rows.add(label_row_pos)
1353 if 0 <= label_row_pos < total_height:
1354 label_start = rightmost_pipe_col + 2
1355 for j, ch in enumerate(label):
1356 if label_start + j < content_left - 1 and label_start + j < total_width:
1357 grid[label_row_pos][label_start + j] = _lc(ch)
1359 # Forward skip-layer edges: right-margin vertical arrows with labels
1360 # Symmetric to the left-margin back-edge renderer above
1361 if skip_forward_edges:
1362 sorted_fwd_skip = sorted(
1363 skip_forward_edges,
1364 key=lambda e: abs(row_start.get(e[0], 0) - row_start.get(e[1], 0)),
1365 reverse=True,
1366 )
1367 used_fwd_cols: list[int] = []
1368 # Rightmost pipe column (furthest from content) for label placement
1369 rightmost_fwd_pipe_col = total_content_w + 2 + (len(sorted_fwd_skip) - 1) * 2
1371 for src, dst, label in sorted_fwd_skip:
1372 src_row = row_start.get(src, 0) + 1 # name row
1373 dst_row = row_start.get(dst, 0) + 1 # name row
1375 # Allocate column in right margin (starting from content edge, going right)
1376 col = total_content_w + 2
1377 for uc in sorted(used_fwd_cols):
1378 if uc == col:
1379 col += 2
1380 used_fwd_cols.append(col)
1382 if col >= total_width:
1383 continue
1385 top_row = min(src_row, dst_row)
1386 bot_row = max(src_row, dst_row)
1387 ec = _edge_line_color(label)
1389 def _lc(ch: str, _ec: str = ec) -> str: # noqa: E306
1390 return colorize(ch, _ec) if _ec else ch
1392 # Draw vertical line in right margin (exclude corner rows handled below)
1393 for r in range(top_row + 1, bot_row):
1394 if 0 <= r < total_height and col < total_width:
1395 cell = grid[r][col]
1396 if cell == "\u2500": # ─ → ┼ (junction)
1397 grid[r][col] = "\u253c"
1398 elif cell == " ":
1399 grid[r][col] = _lc("\u2502")
1401 # Horizontal connector from source box right side to margin
1402 # Draw left-to-right, crossing existing pipes with junction chars
1403 src_right = col_start.get(src, 0) + box_width.get(src, 0)
1404 _src_row_boxes = _box_occ.get(src_row, set())
1405 if 0 <= src_row < total_height:
1406 for c in range(src_right, col):
1407 if 0 <= c < total_width and c not in _src_row_boxes:
1408 cell = grid[src_row][c]
1409 if cell == " ":
1410 grid[src_row][c] = _lc("\u2500") # ─
1411 elif cell == "\u2502": # │ → ┼ (junction)
1412 grid[src_row][c] = "\u253c"
1413 elif cell == "\u2518": # ┘ → ┴ (junction)
1414 grid[src_row][c] = "\u2534"
1415 elif cell == "\u2510": # ┐ → ┬ (junction)
1416 grid[src_row][c] = "\u252c"
1417 elif cell == "\u2524": # ┤ → ┼ (junction)
1418 grid[src_row][c] = "\u253c"
1419 # Leave ─, ◀, box chars unchanged
1421 # Horizontal connector from margin to destination box right side
1422 dst_right = col_start.get(dst, 0) + box_width.get(dst, 0)
1423 _dst_row_boxes = _box_occ.get(dst_row, set())
1424 if 0 <= dst_row < total_height:
1425 for c in range(dst_right, col):
1426 if 0 <= c < total_width and c not in _dst_row_boxes:
1427 cell = grid[dst_row][c]
1428 if cell == " ":
1429 grid[dst_row][c] = _lc("\u2500") # ─
1430 elif cell == "\u2502": # │ → ┼ (junction)
1431 grid[dst_row][c] = "\u253c"
1432 elif cell == "\u2518": # ┘ → ┴ (junction)
1433 grid[dst_row][c] = "\u2534"
1434 elif cell == "\u2510": # ┐ → ┬ (junction)
1435 grid[dst_row][c] = "\u252c"
1436 elif cell == "\u2524": # ┤ → ┼ (junction)
1437 grid[dst_row][c] = "\u253c"
1439 # Corner characters at pipe-to-horizontal turn points
1440 for row in (src_row, dst_row):
1441 if 0 <= row < total_height and col < total_width:
1442 existing = grid[row][col]
1443 if row == bot_row:
1444 # Pipe ends, turns left: ┘; if horizontal crosses: ┤
1445 grid[row][col] = "\u2524" if existing == "\u2500" else _lc("\u2518")
1446 else: # row == top_row
1447 # Pipe starts going down, turns left: ┐; if horizontal crosses: ┤
1448 grid[row][col] = "\u2524" if existing == "\u2500" else _lc("\u2510")
1450 # Arrow tip at target: ◀ entering box from right side
1451 if 0 <= dst_row < total_height and dst_right < col and dst_right < total_width:
1452 grid[dst_row][dst_right] = _lc("\u25c0")
1454 # Label on the margin line (right of ALL pipes, mirroring left-margin
1455 # approach). Truncate with … when label would exceed total_width
1456 # to prevent extending diagram lines far past the content area (BUG-1500).
1457 label_row_pos = (top_row + bot_row) // 2
1458 # Nudge to avoid row collision with a previously-placed label (back- or fwd-edge)
1459 if label_row_pos in used_label_rows and top_row + 1 < bot_row:
1460 midpoint = label_row_pos
1461 found = False
1462 for _off in range(1, bot_row - top_row):
1463 for _cand in (midpoint - _off, midpoint + _off):
1464 if top_row < _cand < bot_row and _cand not in used_label_rows:
1465 label_row_pos = _cand
1466 found = True
1467 break
1468 if found:
1469 break
1470 if not found:
1471 label_row_pos = top_row + 1
1472 used_label_rows.add(label_row_pos)
1473 if 0 <= label_row_pos < total_height:
1474 label_start = rightmost_fwd_pipe_col + 2
1475 max_label = total_width - label_start
1476 if 0 < max_label < len(label):
1477 label = label[: max_label - 1] + "…"
1478 for j, ch in enumerate(label):
1479 if label_start + j < total_width:
1480 grid[label_row_pos][label_start + j] = _lc(ch)
1482 # Convert grid to string
1483 lines = ["".join(row).rstrip() for row in grid]
1485 # Remove trailing empty lines
1486 while lines and not lines[-1].strip():
1487 lines.pop()
1489 # Center diagram
1490 max_line_len = max((len(_ANSI_ESCAPE_RE.sub("", ln)) for ln in lines), default=0)
1491 diagram_indent = max(0, (tw - max_line_len) // 2)
1492 if diagram_indent > 0:
1493 lines = [" " * diagram_indent + ln if ln.strip() else ln for ln in lines]
1495 return _colorize_diagram_labels("\n".join(lines), edge_label_colors)
1498# ---------------------------------------------------------------------------
1499# FSM diagram renderer (main entry point)
1500# ---------------------------------------------------------------------------
1503def _render_fsm_diagram(
1504 fsm: FSMLoop,
1505 verbose: bool = False,
1506 highlight_state: str | None = None,
1507 highlight_color: str = "32",
1508 edge_label_colors: dict[str, str] | None = None,
1509 badges: dict[str, str] | None = None,
1510) -> str:
1511 """Render an adaptive text diagram of the FSM graph.
1513 Detects FSM topology and selects appropriate layout:
1514 - Linear chains: vertical top-to-bottom
1515 - Branching/cyclic: layered Sugiyama-style
1517 Args:
1518 fsm: The FSM loop to render.
1519 verbose: If True, show expanded action content in boxes.
1520 highlight_state: If provided, render this state's box with the highlight color.
1521 highlight_color: ANSI SGR code for the highlighted state (default: green).
1522 edge_label_colors: Optional label→SGR-code mapping for transition labels.
1523 Falls back to hardcoded defaults when None.
1524 badges: Optional glyph-key→string mapping for state type badges.
1525 Falls back to hardcoded defaults when None.
1526 """
1527 edges = _collect_edges(fsm)
1528 bfs_order_list, bfs_depth = _bfs_order(fsm.initial, edges)
1529 main_path, main_edge_set = _trace_main_path(fsm, edges)
1530 branches, back_edges = _classify_edges(edges, main_edge_set, bfs_order_list)
1532 terminal_states = {name for name, state in fsm.states.items() if state.terminal}
1534 # Collect all states
1535 all_states = list(main_path)
1536 for src, dst, _ in branches:
1537 for s in (src, dst):
1538 if s not in all_states:
1539 all_states.append(s)
1541 # Topology detection
1542 detector = TopologyDetector(edges, main_path, branches, back_edges)
1543 topology = detector.classify()
1545 # Build back-edge set for layout pipeline
1546 back_edge_set: set[tuple[str, str]] = set()
1547 for src, dst, _ in back_edges:
1548 if src != dst:
1549 back_edge_set.add((src, dst))
1551 tw = terminal_width()
1553 if topology == "linear" and len(all_states) <= 1:
1554 # Single state or empty — use simple horizontal
1555 return _render_horizontal_simple(
1556 main_path,
1557 edges,
1558 main_edge_set,
1559 branches,
1560 back_edges,
1561 bfs_order_list,
1562 fsm.initial,
1563 terminal_states,
1564 fsm.states,
1565 verbose,
1566 highlight_state,
1567 highlight_color,
1568 edge_label_colors,
1569 badges,
1570 )
1572 # Compute max node width to determine width constraint
1573 # Quick estimate: widest state name or badge + padding
1574 max_node_w = 30 # reasonable default
1575 for s in all_states:
1576 st = fsm.states.get(s)
1577 badge = _get_state_badge(st, badges)
1578 badge_w = _badge_display_width(badge) if badge else 0
1579 label = s
1580 if s == fsm.initial:
1581 label = "\u2192 " + label
1582 if s in terminal_states:
1583 label = label + " \u25c9"
1584 w = max(len(label), badge_w)
1585 max_node_w = max(max_node_w, w + 4 + 4) # inner + borders + padding
1587 max_width_per_layer = max(1, (tw - 10) // (max_node_w + 4))
1589 # Layer assignment
1590 assigner = LayerAssigner(all_states, edges, back_edge_set, fsm.initial, max_width_per_layer)
1591 layers = assigner.assign()
1593 # Crossing minimization
1594 minimizer = CrossingMinimizer(layers, edges, back_edge_set)
1595 layers = minimizer.minimize()
1597 return _render_layered_diagram(
1598 layers,
1599 edges,
1600 main_edge_set,
1601 branches,
1602 back_edges,
1603 fsm.initial,
1604 terminal_states,
1605 fsm.states,
1606 verbose,
1607 highlight_state,
1608 highlight_color,
1609 edge_label_colors,
1610 badges,
1611 )
1614def _render_horizontal_simple(
1615 main_path: list[str],
1616 edges: list[tuple[str, str, str]],
1617 main_edge_set: set[tuple[str, str]],
1618 branches: list[tuple[str, str, str]],
1619 back_edges: list[tuple[str, str, str]],
1620 bfs_order: list[str],
1621 initial: str,
1622 terminal_states: set[str],
1623 fsm_states: dict[str, StateConfig],
1624 verbose: bool,
1625 highlight_state: str | None,
1626 highlight_color: str,
1627 edge_label_colors: dict[str, str] | None = None,
1628 badges: dict[str, str] | None = None,
1629) -> str:
1630 """Simple horizontal rendering for single-state or very simple FSMs."""
1631 if not main_path:
1632 return ""
1634 all_states = list(main_path)
1635 display_label = _compute_display_labels(all_states, initial, terminal_states)
1637 tw = terminal_width()
1638 num_main = max(1, len(main_path))
1639 if verbose and fsm_states and main_path:
1640 max_box_inner = max(20, min(60, (tw - 4) // num_main - 6))
1641 else:
1642 max_box_inner = max(20, min(40, (tw - 4) // num_main - 6))
1644 box_inner, box_width, box_height, box_badge = _compute_box_sizes(
1645 all_states, display_label, fsm_states, verbose, max_box_inner, badges
1646 )
1648 main_height = max((box_height[s] for s in main_path), default=3)
1649 total_width = tw
1651 # Column positions
1652 col_start: dict[str, int] = {}
1653 col_center: dict[str, int] = {}
1654 x = 2
1655 for i, sname in enumerate(main_path):
1656 col_start[sname] = x
1657 col_center[sname] = x + box_width[sname] // 2
1658 x += box_width[sname]
1659 if i < len(main_path) - 1:
1660 x += 4
1662 rows: list[list[str]] = [[" "] * total_width for _ in range(main_height)]
1664 for sname in main_path:
1665 is_highlighted = highlight_state is not None and sname == highlight_state
1666 _draw_box(
1667 rows,
1668 0,
1669 col_start[sname],
1670 box_width[sname],
1671 main_height,
1672 box_inner[sname],
1673 is_highlighted,
1674 highlight_color,
1675 badge=box_badge[sname],
1676 )
1678 # Self-loops
1679 self_loops_list = [(s, d, lbl) for s, d, lbl in back_edges if s == d]
1680 lines = ["".join(row).rstrip() for row in rows]
1681 if self_loops_list:
1682 self_labels: dict[str, list[str]] = {}
1683 for src, _, label in self_loops_list:
1684 self_labels.setdefault(src, []).append(label)
1685 for sname, labels in self_labels.items():
1686 marker = "\u21ba " + ", ".join(labels)
1687 self_row = [" "] * total_width
1688 cx = col_center.get(sname, 0)
1689 pos = max(0, cx - len(marker) // 2)
1690 for j, ch in enumerate(marker):
1691 if pos + j < total_width:
1692 self_row[pos + j] = ch
1693 lines.append("".join(self_row).rstrip())
1695 diagram_indent = max(0, (tw - (x + 4)) // 2)
1696 if diagram_indent > 0:
1697 lines = [" " * diagram_indent + ln if ln.strip() else ln for ln in lines]
1699 return _colorize_diagram_labels("\n".join(lines), edge_label_colors)