Coverage for little_loops / cli / loop / info.py: 79%
646 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-loop info subcommands: list, history, show."""
3from __future__ import annotations
5import argparse
6import os
7import re
8from datetime import datetime
9from pathlib import Path
10from typing import Any
12from little_loops.cli.loop._helpers import (
13 get_builtin_loops_dir,
14 load_loop_with_spec,
15 resolve_loop_path,
16)
17from little_loops.cli.loop.layout import ( # noqa: F401
18 _EDGE_LABEL_COLORS,
19 _box_inner_lines,
20 _colorize_diagram_labels,
21 _colorize_label,
22 _render_fsm_diagram,
23)
24from little_loops.cli.output import colorize, print_json, terminal_width
25from little_loops.fsm.schema import FSMLoop, StateConfig
26from little_loops.fsm.validation import load_and_validate
27from little_loops.logger import Logger
30def _load_loop_meta(path: Path) -> dict[str, Any]:
31 """Return metadata from a loop YAML file (description, category, labels)."""
32 import yaml
34 try:
35 with open(path) as f:
36 spec = yaml.safe_load(f) or {}
37 desc_raw = spec.get("description", "") or ""
38 if desc_raw.strip():
39 raw_lines = desc_raw.splitlines()
40 desc = raw_lines[0]
41 if len(raw_lines) > 1:
42 desc += "…"
43 else:
44 desc = ""
45 category = spec.get("category", "") or ""
46 labels: list[str] = spec.get("labels", []) or []
47 return {"description": desc, "category": category, "labels": labels}
48 except Exception:
49 return {"description": "", "category": "", "labels": []}
52def cmd_list(
53 args: argparse.Namespace,
54 loops_dir: Path,
55) -> int:
56 """List loops."""
57 status_filter = getattr(args, "status", None)
58 if getattr(args, "running", False) or status_filter:
59 from little_loops.fsm.persistence import list_running_loops
61 states = list_running_loops(loops_dir)
62 if status_filter:
63 states = [s for s in states if s.status == status_filter]
64 if not states:
65 if status_filter:
66 print(f"No loops with status: {status_filter}")
67 return 1
68 print("No running loops")
69 return 0
70 if getattr(args, "json", False):
71 print_json([s.to_dict() for s in states])
72 return 0
73 print(colorize("Running loops:", "1"))
74 _STATUS_COLORS = {"running": "32", "interrupted": "33", "stopped": "2", "starting": "33"}
76 # Group by loop_name to avoid duplicate rows for multi-instance loops
77 from collections import defaultdict
79 groups: dict[str, list] = defaultdict(list)
80 for state in states:
81 groups[state.loop_name].append(state)
83 for loop_name_key, group_states in groups.items():
84 if len(group_states) == 1:
85 state = group_states[0]
86 elapsed_s = (state.accumulated_ms or 0) // 1000
87 elapsed_str = (
88 f"{elapsed_s}s" if elapsed_s < 60 else f"{elapsed_s // 60}m {elapsed_s % 60}s"
89 )
90 name_str = colorize(state.loop_name, "1")
91 state_str = colorize(state.current_state, "34")
92 status_color = _STATUS_COLORS.get(state.status, "2")
93 display_status = "paused" if state.status == "interrupted" else state.status
94 status_str = colorize(f"[{display_status}]", status_color)
95 elapsed_colored = colorize(elapsed_str, "2")
96 print(
97 f" {name_str}: {state_str} (iteration {state.iteration})"
98 f" {status_str} {elapsed_colored}"
99 )
100 else:
101 # Multiple instances: show a grouped summary
102 name_str = colorize(loop_name_key, "1")
103 statuses = ", ".join(
104 colorize(
105 f"[{'paused' if s.status == 'interrupted' else s.status}]",
106 _STATUS_COLORS.get(s.status, "2"),
107 )
108 for s in group_states
109 )
110 count_str = colorize(f"({len(group_states)} instances)", "2")
111 print(f" {name_str}: {count_str} {statuses}")
112 return 0
114 builtin_only = getattr(args, "builtin", False)
116 # Collect project loops (skipped when --builtin is set)
117 project_names: set[str] = set()
118 yaml_files: list[Path] = []
119 if not builtin_only and loops_dir.exists():
120 yaml_files = sorted(loops_dir.glob("*.yaml"))
121 project_names = {p.stem for p in yaml_files}
123 # Collect built-in loops (excluding those overridden by project)
124 builtin_dir = get_builtin_loops_dir()
125 builtin_files: list[Path] = []
126 if builtin_dir.exists():
127 builtin_files = [
128 f for f in sorted(builtin_dir.glob("*.yaml")) if f.stem not in project_names
129 ]
131 if not yaml_files and not builtin_files:
132 if getattr(args, "json", False):
133 print_json([])
134 return 0
135 print("No loops available")
136 return 0
138 # Build combined metadata list
139 all_loops: list[dict[str, Any]] = []
140 for path in yaml_files:
141 meta = _load_loop_meta(path)
142 all_loops.append({"name": path.stem, "path": path, "builtin": False, **meta})
143 for path in builtin_files:
144 meta = _load_loop_meta(path)
145 all_loops.append({"name": path.stem, "path": path, "builtin": True, **meta})
147 # Apply --category filter
148 category_filter = getattr(args, "category", None)
149 if category_filter:
150 all_loops = [lp for lp in all_loops if lp["category"] == category_filter]
152 # Apply --label filter (action="append" → list or None)
153 label_filters: list[str] = getattr(args, "label", None) or []
154 if label_filters:
155 all_loops = [
156 lp
157 for lp in all_loops
158 if any(lf.lower() in [lb.lower() for lb in lp["labels"]] for lf in label_filters)
159 ]
161 if not all_loops:
162 if getattr(args, "json", False):
163 print_json([])
164 return 0
165 print("No loops match the given filters")
166 return 0
168 if getattr(args, "json", False):
169 items: list[dict[str, Any]] = []
170 for lp in all_loops:
171 item: dict[str, Any] = {
172 "name": lp["name"],
173 "path": str(lp["path"]),
174 "category": lp["category"],
175 "labels": lp["labels"],
176 }
177 if lp["builtin"]:
178 item["built_in"] = True
179 items.append(item)
180 print_json(items)
181 return 0
183 # Human-readable: group by category
184 buckets: dict[str, list[dict[str, Any]]] = {}
185 for lp in all_loops:
186 cat = lp["category"] or "uncategorized"
187 if cat not in buckets:
188 buckets[cat] = []
189 buckets[cat].append(lp)
191 # Sort categories; "uncategorized" always last
192 sorted_cats = sorted(c for c in buckets if c != "uncategorized")
193 if "uncategorized" in buckets:
194 sorted_cats.append("uncategorized")
196 # Compute max name width for column alignment
197 max_name_len = max((len(lp["name"]) for lp in all_loops), default=0)
198 name_col = max_name_len + 2 # padding after longest name
199 tw = terminal_width()
201 cats_printed = False
202 for cat in sorted_cats:
203 group = buckets[cat]
204 if cats_printed:
205 print() # blank line between category groups
206 cats_printed = True
207 print(colorize(f"{cat} ({len(group)}):", "1"))
208 for lp in group:
209 # Name: project loops get bold cyan, built-in loops get dimmer cyan
210 name_color = "36" if lp["builtin"] else "36;1"
211 name_str = colorize(lp["name"].ljust(name_col), name_color)
213 # Suffix: labels + [built-in] tag
214 suffix_parts: list[str] = []
215 if lp["labels"]:
216 for label in lp["labels"]:
217 suffix_parts.append(colorize(f"[{label}]", "2"))
218 if lp["builtin"]:
219 suffix_parts.append(colorize("[built-in]", "2"))
221 if suffix_parts:
222 suffix_raw = " " + " ".join(suffix_parts)
223 suffix_visible = len(_strip_ansi(suffix_raw))
224 else:
225 suffix_raw = ""
226 suffix_visible = 0
228 # Available width for description: indent + name_col + " " + desc + suffix
229 avail = tw - 2 - name_col - 2 - suffix_visible
230 desc_text = lp["description"] or ""
231 if desc_text and avail < len(desc_text):
232 desc_text = _truncate(desc_text, max(avail, 20))
233 desc_str = f" {colorize(desc_text, '2')}" if desc_text else ""
235 print(f" {name_str}{desc_str}{suffix_raw}")
236 return 0
239_EVENT_TYPE_WIDTH = 16 # width of "handoff_detected"
241_ANSI_RE = re.compile(r"\033\[[0-9;]*m")
244def _strip_ansi(text: str) -> str:
245 return _ANSI_RE.sub("", text)
248def _truncate(text: str, max_len: int) -> str:
249 """Truncate text to max_len with ellipsis."""
250 if max_len < 1:
251 return ""
252 if len(text) <= max_len:
253 return text
254 return text[: max_len - 1] + "\u2026"
257def _format_history_event(
258 event: dict[str, Any], verbose: bool, width: int, full: bool = False
259) -> str | None:
260 """Format a single history event. Returns None to skip the event."""
261 raw_ts = event.get("ts", "")
262 try:
263 ts = datetime.fromisoformat(raw_ts).strftime("%H:%M:%S")
264 except (ValueError, TypeError):
265 ts = raw_ts[:8] if len(raw_ts) >= 8 else raw_ts.ljust(8)
267 event_type = event.get("event", "unknown")
269 if event_type == "action_output" and not verbose:
270 return None
272 ts_str = colorize(ts, "2")
273 etype_padded = event_type.ljust(_EVENT_TYPE_WIDTH)
274 etype_color = "0"
275 detail = ""
276 extra_lines: list[str] = []
278 # Indentation prefix for verbose sub-lines (aligns under event detail column)
279 _indent = " " * (8 + 2 + _EVENT_TYPE_WIDTH + 2)
281 if event_type == "loop_start":
282 etype_color = "1"
283 detail = event.get("loop", "")
285 elif event_type == "loop_complete":
286 etype_color = "1"
287 final_state = event.get("final_state", "")
288 iterations = event.get("iterations", "")
289 terminated_by = event.get("terminated_by", "")
290 detail = f"{final_state} {iterations} iter [{terminated_by}]"
292 elif event_type == "loop_resume":
293 etype_color = "1"
294 from_state = event.get("from_state", "")
295 iteration = event.get("iteration", "")
296 detail = f"from={from_state} iter={iteration}"
298 elif event_type == "state_enter":
299 etype_color = "34"
300 state = event.get("state", "")
301 iteration = event.get("iteration", "")
302 detail = f"{colorize(state, '1')} (iter {iteration})"
304 elif event_type == "action_start":
305 action = event.get("action", "")
306 is_prompt = event.get("is_prompt", False)
307 kind_label = "prompt" if is_prompt else "shell"
308 kind_str = colorize(f"[{kind_label}]", "2")
309 first_line = (
310 next((ln.strip() for ln in action.splitlines() if ln.strip()), "")
311 if is_prompt
312 else action
313 )
314 avail = width - 8 - 2 - _EVENT_TYPE_WIDTH - 2 - len(kind_label) - 2 - 2
315 detail = f"{_truncate(first_line, max(avail, 20))} {kind_str}"
317 elif event_type == "action_output":
318 # Only reached in verbose mode
319 etype_color = "2"
320 detail = colorize("\u2502 " + event.get("line", ""), "2")
322 elif event_type == "action_complete":
323 exit_code = event.get("exit_code", 0)
324 duration_ms = event.get("duration_ms", 0)
325 if exit_code == 0:
326 etype_color = "2"
327 status_str = colorize("\u2713", "32")
328 else:
329 etype_color = "38;5;208"
330 status_str = colorize(f"\u2717 exit={exit_code}", "38;5;208")
331 detail = f"{status_str} {duration_ms}ms"
332 is_prompt = event.get("is_prompt", False)
333 session_jsonl = event.get("session_jsonl") if is_prompt else None
334 if session_jsonl:
335 session_display = session_jsonl if verbose else os.path.basename(session_jsonl)
336 detail += f" session={colorize(session_display, '2')}"
337 if verbose:
338 output_preview = event.get("output_preview", "")
339 if output_preview:
340 avail_w = width - len(_indent) - 2
341 preview_text = (
342 output_preview if full else _truncate(output_preview, max(avail_w, 40))
343 )
344 for preview_line in preview_text.splitlines()[:5]:
345 extra_lines.append(colorize(_indent + "\u2502 " + preview_line, "2"))
347 elif event_type == "evaluate":
348 verdict = event.get("verdict", "")
349 confidence = event.get("confidence", "")
350 reason = event.get("reason", "")
351 if verdict == "yes":
352 etype_color = "32"
353 verdict_str = colorize("\u2713 yes", "32")
354 else:
355 etype_color = "38;5;208"
356 verdict_str = colorize(f"\u2717 {verdict}", "38;5;208")
357 conf_part = f" confidence={confidence}" if confidence != "" else ""
358 avail = width - 8 - 2 - _EVENT_TYPE_WIDTH - 2 - len("\u2713 yes") - len(conf_part) - 2
359 reason_part = f" {_truncate(reason, max(avail, 20))}" if reason else ""
360 detail = f"{verdict_str}{conf_part}{reason_part}"
361 if verbose:
362 llm_model = event.get("llm_model", "")
363 llm_latency_ms = event.get("llm_latency_ms", "")
364 llm_prompt = event.get("llm_prompt", "")
365 llm_raw_output = event.get("llm_raw_output", "")
366 if llm_model or llm_prompt:
367 meta_parts = []
368 if llm_model:
369 meta_parts.append(f"model={llm_model}")
370 if llm_latency_ms != "":
371 meta_parts.append(f"latency={llm_latency_ms}ms")
372 meta_str = " ".join(meta_parts)
373 extra_lines.append(
374 colorize(_indent + colorize("LLM Call", "2") + " " + meta_str, "2")
375 )
376 avail_w = width - len(_indent) - len("Prompt: ") - 2
377 if llm_prompt:
378 prompt_text = llm_prompt if full else _truncate(llm_prompt, max(avail_w, 40))
379 extra_lines.append(colorize(_indent + "Prompt: " + prompt_text, "2"))
380 if llm_raw_output:
381 resp_text = (
382 llm_raw_output if full else _truncate(llm_raw_output, max(avail_w, 40))
383 )
384 extra_lines.append(colorize(_indent + "Response: " + resp_text, "2"))
386 elif event_type == "route":
387 etype_color = "2"
388 from_state = event.get("from", "")
389 to_state = event.get("to", "")
390 detail = f"{from_state} \u2192 {colorize(to_state, '34')}"
392 elif event_type == "handoff_detected":
393 etype_color = "33"
394 detail = f"state={event.get('state', '')} iter={event.get('iteration', '')}"
396 else:
397 details = {k: v for k, v in event.items() if k not in ("event", "ts")}
398 detail = " ".join(f"{k}={v}" for k, v in details.items())
400 etype_str = colorize(etype_padded, etype_color)
401 main_line = f"{ts_str} {etype_str} {detail}"
402 if extra_lines:
403 return "\n".join([main_line] + extra_lines)
404 return main_line
407def _format_duration(ms: int) -> str:
408 """Format milliseconds as a human-readable duration."""
409 if ms < 1000:
410 return f"{ms}ms"
411 s = ms // 1000
412 if s < 60:
413 return f"{s}s"
414 m, s = divmod(s, 60)
415 if m < 60:
416 return f"{m}m{s:02d}s"
417 h, m = divmod(m, 60)
418 return f"{h}h{m:02d}m{s:02d}s"
421def _list_archived_runs(loop_name: str, loops_dir: Path, as_json: bool) -> int:
422 """List archived runs for a loop."""
423 import json as _json
425 from little_loops.fsm.persistence import HISTORY_DIR, LoopState
427 history_base = loops_dir / HISTORY_DIR
428 if not history_base.exists():
429 print(f"No history for: {loop_name}")
430 return 0
432 # Flat layout: run dirs are <run_id>-<loop_name> directly under .history/
433 suffix = f"-{loop_name}"
434 runs: list[tuple[str, LoopState | None]] = []
435 for run_dir in sorted(history_base.iterdir(), key=lambda d: d.name, reverse=True):
436 if not run_dir.is_dir() or not run_dir.name.endswith(suffix):
437 continue
438 run_id = run_dir.name[: -len(suffix)]
439 state_file = run_dir / "state.json"
440 state: LoopState | None = None
441 if state_file.exists():
442 try:
443 data = _json.loads(state_file.read_text())
444 state = LoopState.from_dict(data)
445 except (ValueError, KeyError):
446 pass
447 runs.append((run_id, state))
449 if not runs:
450 print(f"No history for: {loop_name}")
451 return 0
453 if as_json:
454 print(
455 _json.dumps(
456 [
457 {
458 "run_id": rid,
459 "status": s.status if s else None,
460 "started_at": s.started_at if s else None,
461 "iterations": s.iteration if s else None,
462 "duration_ms": s.accumulated_ms if s else None,
463 }
464 for rid, s in runs
465 ],
466 indent=2,
467 )
468 )
469 return 0
471 status_colors = {
472 "completed": "\033[32m",
473 "failed": "\033[31m",
474 "interrupted": "\033[33m",
475 "awaiting_continuation": "\033[36m",
476 "timed_out": "\033[33m",
477 "running": "\033[34m",
478 }
479 reset = "\033[0m"
481 print(f"Archived runs for: {loop_name} ({len(runs)} total)")
482 print()
484 for run_id, state in runs:
485 if state is not None:
486 color = status_colors.get(state.status, "")
487 status_str = f"{color}{state.status}{reset}"
488 duration_str = _format_duration(state.accumulated_ms) if state.accumulated_ms else "?"
489 started = state.started_at[:19].replace("T", " ") if state.started_at else "?"
490 iters = f"{state.iteration} iters"
491 else:
492 status_str = "unknown"
493 duration_str = "?"
494 started = "?"
495 iters = "?"
496 print(f" {run_id} {status_str} {started} {iters} {duration_str}")
498 print()
499 print(f"To view events: ll-loop history {loop_name} <run-id>")
500 return 0
503def cmd_history(
504 loop_name: str,
505 run_id: str | None,
506 args: argparse.Namespace,
507 loops_dir: Path,
508) -> int:
509 """Show loop history.
511 Without run_id: lists all archived runs with status and duration.
512 With run_id: shows events for that specific archived run.
513 """
514 tail = getattr(args, "tail", 50)
515 full = getattr(args, "full", False)
516 verbose = getattr(args, "verbose", False) or full
517 as_json = getattr(args, "json", False)
519 if run_id is None:
520 return _list_archived_runs(loop_name, loops_dir, as_json)
522 # Show events for a specific archived run
523 from little_loops.fsm.persistence import get_archived_events
525 events = get_archived_events(loop_name, run_id, loops_dir)
527 if not events:
528 print(f"No events found for run {run_id} of loop {loop_name}")
529 return 1
531 w = terminal_width()
532 if not verbose:
533 events = [e for e in events if e.get("event") != "action_output"]
535 # Apply optional filters (before --tail slice)
536 event_filter = getattr(args, "event", None)
537 if event_filter:
538 events = [e for e in events if e.get("event") == event_filter]
540 state_filter = getattr(args, "state", None)
541 if state_filter:
542 events = [
543 e
544 for e in events
545 if (
546 e.get("state") == state_filter
547 or e.get("from") == state_filter
548 or e.get("to") == state_filter
549 )
550 ]
552 since_str = getattr(args, "since", None)
553 if since_str:
554 from datetime import timedelta
556 from little_loops.text_utils import parse_duration
558 cutoff = datetime.now() - timedelta(seconds=parse_duration(since_str))
559 events = [
560 e
561 for e in events
562 if datetime.fromisoformat(e["ts"].replace("Z", "+00:00")).replace(tzinfo=None) >= cutoff
563 ]
565 if as_json:
566 print_json(events[-tail:])
567 return 0
568 for event in events[-tail:]:
569 line = _format_history_event(event, verbose, w, full=full)
570 if line is not None:
571 print(line)
573 return 0
576# ---------------------------------------------------------------------------
577# FSM diagram renderer — delegated to layout module (re-exported above)
578# ---------------------------------------------------------------------------
581# ---------------------------------------------------------------------------
582# State overview table
583# ---------------------------------------------------------------------------
586def _compact_transitions(state: StateConfig) -> str:
587 """Return a compact transition string for the overview table."""
588 raw: list[tuple[str, str]] = []
589 for label, target in [
590 ("yes", state.on_yes),
591 ("no", state.on_no),
592 ("error", state.on_error),
593 ("partial", state.on_partial),
594 ("next", state.next),
595 ]:
596 if target:
597 raw.append((label, target))
598 if state.route:
599 for verdict, target in state.route.routes.items():
600 raw.append((verdict, target))
601 if state.route.default:
602 raw.append(("_", state.route.default))
603 if not raw:
604 return "\u2014"
605 # Group by target, preserving first-seen order
606 seen: list[str] = []
607 by_target: dict[str, list[str]] = {}
608 for label, target in raw:
609 if target not in by_target:
610 seen.append(target)
611 by_target[target] = []
612 by_target[target].append(label)
613 return ", ".join(f"{'/'.join(by_target[t])}\u2192{t}" for t in seen)
616def _print_state_overview_table(fsm: FSMLoop) -> None:
617 """Print a compact summary table of all states."""
618 rows: list[tuple[str, str, str, str]] = []
619 for name, state in fsm.states.items():
620 # State name column
621 state_col = f"\u2192 {name}" if name == fsm.initial else f" {name}"
623 # Type column
624 if state.terminal:
625 type_col = "\u2014"
626 elif state.action_type:
627 type_col = state.action_type
628 elif state.action:
629 type_col = "shell"
630 else:
631 type_col = "\u2014"
633 # Action preview column
634 if state.terminal:
635 action_col = "(terminal)"
636 elif state.action:
637 src_lines = [ln.rstrip() for ln in state.action.strip().splitlines() if ln.rstrip()]
638 action_col = src_lines[0] if src_lines else "\u2014"
639 else:
640 action_col = "\u2014"
642 # Transitions column
643 trans_col = _compact_transitions(state)
644 rows.append((state_col, type_col, action_col, trans_col))
646 if not rows:
647 return
649 tw = terminal_width()
650 headers = ("State", "Type", "Action Preview", "Transitions")
651 col0_w = max(len(headers[0]), max(len(r[0]) for r in rows))
652 col1_w = max(len(headers[1]), max(len(r[1]) for r in rows))
653 # Remaining width split between action preview and transitions
654 fixed = col0_w + col1_w + 10 # margins + separators
655 remaining = max(20, tw - fixed)
656 col2_w = min(50, max(len(headers[2]), max(len(r[2]) for r in rows)), remaining * 3 // 5)
657 col2_w = max(10, col2_w)
658 col3_w = max(10, remaining - col2_w)
660 print(f" {headers[0]:<{col0_w}} {headers[1]:<{col1_w}} {headers[2]:<{col2_w}} {headers[3]}")
661 dash = "\u2500"
662 print(f" {dash * col0_w} {dash * col1_w} {dash * col2_w} {dash * col3_w}")
663 for state_col, type_col, action_col, trans_col in rows:
664 if len(action_col) > col2_w:
665 action_col = action_col[: col2_w - 1] + "\u2026"
666 if len(trans_col) > col3_w:
667 trans_col = trans_col[: col3_w - 1] + "\u2026"
668 colored_type = colorize(type_col, "2") if type_col == "\u2014" else type_col
669 print(
670 f" {state_col:<{col0_w}} {colored_type:<{col1_w}} "
671 f"{action_col:<{col2_w}} {trans_col}"
672 )
675# ---------------------------------------------------------------------------
676# cmd_show
677# ---------------------------------------------------------------------------
679_EVALUATE_TYPE_DISPLAY: dict[str, str] = {
680 "llm": "LLM",
681 "llm_structured": "LLM (structured)",
682 "exit_code": "exit code",
683 "output_numeric": "numeric",
684 "output_contains": "contains",
685 "output_json": "JSON",
686 "convergence": "convergence",
687}
690def _humanize_evaluate_type(ev_type: str) -> str:
691 return _EVALUATE_TYPE_DISPLAY.get(ev_type, ev_type)
694def cmd_show(
695 loop_name: str,
696 args: argparse.Namespace,
697 loops_dir: Path,
698 logger: Logger,
699) -> int:
700 """Show loop details and structure."""
701 try:
702 path = resolve_loop_path(loop_name, loops_dir)
703 fsm, spec = load_loop_with_spec(loop_name, loops_dir, logger)
704 except FileNotFoundError as e:
705 logger.error(str(e))
706 return 1
707 except ValueError as e:
708 logger.error(f"Invalid loop: {e}")
709 return 1
711 if getattr(args, "json", False):
712 data = fsm.to_dict()
713 if getattr(args, "resolved", False):
714 for state in data.get("states", {}).values():
715 if "loop" in state:
716 try:
717 child_path = resolve_loop_path(state["loop"], loops_dir)
718 child_fsm, _ = load_and_validate(child_path)
719 state["_subloop"] = child_fsm.to_dict().get("states", {})
720 except (FileNotFoundError, ValueError):
721 pass
722 print_json(data)
723 return 0
725 tw = terminal_width()
727 # Compute stats for header
728 n_states = len(fsm.states)
729 n_transitions = sum(
730 bool(s.on_yes)
731 + bool(s.on_no)
732 + bool(s.on_error)
733 + bool(s.on_partial)
734 + bool(s.next)
735 + bool(s.on_maintain)
736 + (len(s.route.routes) + bool(s.route.default) if s.route else 0)
737 for s in fsm.states.values()
738 )
740 # --- Compact metadata header ---
741 # Line 1: ── name ───────── N states · M transitions ──
742 stats_parts: list[str] = []
743 stats_parts.append(f"{n_states} states")
744 stats_parts.append(f"{n_transitions} transitions")
745 stats_str = " \u00b7 ".join(stats_parts)
747 header_left = f"\u2500\u2500 {loop_name} "
748 header_right = f" {stats_str} \u2500\u2500"
749 dashes = "\u2500" * max(0, tw - len(header_left) - len(header_right))
750 print(f"{header_left}{dashes}{header_right}")
752 # Line 2: source · max: N iter · handoff: X [· optional fields]
753 config_parts: list[str] = [str(path), f"max: {fsm.max_iterations} iter"]
754 config_parts.append(f"handoff: {fsm.on_handoff}")
755 if fsm.timeout:
756 config_parts.append(f"timeout: {fsm.timeout}s")
757 if fsm.backoff:
758 config_parts.append(f"backoff: {fsm.backoff}s")
759 if fsm.maintain:
760 config_parts.append("maintain: yes")
761 if fsm.context:
762 config_parts.append(f"context: {', '.join(fsm.context.keys())}")
763 if fsm.scope:
764 config_parts.append(f"scope: {', '.join(fsm.scope)}")
765 llm = fsm.llm
766 llm_parts = []
767 if llm.model != "sonnet":
768 llm_parts.append(f"model={llm.model}")
769 if llm.max_tokens != 256:
770 llm_parts.append(f"max_tokens={llm.max_tokens}")
771 if llm.timeout != 30:
772 llm_parts.append(f"timeout={llm.timeout}s")
773 if llm_parts:
774 config_parts.append(f"llm: {', '.join(llm_parts)}")
775 if fsm.config is not None:
776 cfg_parts = []
777 if fsm.config.handoff_threshold is not None:
778 cfg_parts.append(f"handoff_threshold={fsm.config.handoff_threshold}")
779 if fsm.config.readiness_threshold is not None:
780 cfg_parts.append(f"readiness_threshold={fsm.config.readiness_threshold}")
781 if fsm.config.outcome_threshold is not None:
782 cfg_parts.append(f"outcome_threshold={fsm.config.outcome_threshold}")
783 if fsm.config.max_continuations is not None:
784 cfg_parts.append(f"max_continuations={fsm.config.max_continuations}")
785 if cfg_parts:
786 config_parts.append(f"config: {', '.join(cfg_parts)}")
787 imports = spec.get("import", [])
788 if imports:
789 config_parts.append(f"imports: {', '.join(imports)}")
790 print(" " + " \u00b7 ".join(config_parts))
792 # --- Description ---
793 description = spec.get("description", "").strip()
794 if description:
795 print()
796 print("Description:")
797 for line in description.splitlines():
798 print(f" {line}")
800 # --- ASCII FSM Diagram ---
801 verbose = getattr(args, "verbose", False)
802 from pathlib import Path
804 from little_loops.config import BRConfig
806 badges = BRConfig(Path.cwd()).loops.glyphs.to_dict()
807 print()
808 print("Diagram:")
809 diagram = _render_fsm_diagram(fsm, verbose=verbose, badges=badges)
810 if diagram:
811 print(diagram)
813 # --- State overview table ---
814 print()
815 _print_state_overview_table(fsm)
817 # --- States & Transitions (verbose only) ---
818 if verbose:
819 print()
820 print("States:")
821 first_state = True
822 for name, state in fsm.states.items():
823 if not first_state:
824 print()
825 first_state = False
827 # Improved state section header: ── name ──── MARKERS · type ──
828 right_parts = []
829 if name == fsm.initial:
830 right_parts.append("INITIAL")
831 if state.terminal:
832 right_parts.append("TERMINAL")
833 if state.action_type:
834 right_parts.append(state.action_type)
835 right_info = " \u00b7 ".join(right_parts)
836 inner_left = f"\u2500\u2500 {name} "
837 inner_right = f" {right_info} \u2500\u2500" if right_info else " \u2500\u2500"
838 fill = "\u2500" * max(0, tw - 2 - len(inner_left) - len(inner_right))
839 print(f" {inner_left}{fill}{inner_right}")
841 if state.action:
842 if verbose:
843 indented = "\n ".join(state.action.strip().splitlines())
844 print(f" action:\n {indented}")
845 elif state.action_type == "prompt":
846 lines_act = state.action.strip().splitlines()
847 preview = "\n ".join(lines_act[:3])
848 if len(lines_act) > 3 or len(state.action) > 200:
849 preview += " ..."
850 print(f" action:\n {preview}")
851 else: # shell, slash_command, or None
852 action_display = (
853 state.action[:70] + "..." if len(state.action) > 70 else state.action
854 )
855 print(f" action: {action_display}")
856 if state.evaluate:
857 ev = state.evaluate
858 print(f" evaluate: {_humanize_evaluate_type(ev.type)}")
859 if ev.prompt:
860 if verbose:
861 print(" prompt:")
862 for line in ev.prompt.strip().splitlines():
863 print(f" \u2502 {line}")
864 else:
865 ev_lines = ev.prompt.strip().splitlines()
866 preview = ev_lines[0][:100] + (
867 " ..." if len(ev_lines) > 1 or len(ev_lines[0]) > 100 else ""
868 )
869 print(f" prompt: {preview}")
870 if ev.min_confidence != 0.5:
871 print(f" min_confidence: {ev.min_confidence}")
872 if ev.operator:
873 print(f" operator: {ev.operator} {ev.target}")
874 if ev.pattern:
875 print(f" pattern: {ev.pattern}")
876 if state.capture:
877 print(f" capture: {state.capture}")
878 if state.timeout:
879 print(f" timeout: {state.timeout}s")
880 # Collect (label, target) pairs
881 raw_transitions: list[tuple[str, str]] = []
882 for label, target in [
883 ("yes", state.on_yes),
884 ("no", state.on_no),
885 ("error", state.on_error),
886 ("partial", state.on_partial),
887 ("next", state.next),
888 ("maintain", state.on_maintain),
889 ]:
890 if target:
891 raw_transitions.append((label, target))
892 if state.route:
893 for verdict, target in state.route.routes.items():
894 raw_transitions.append((verdict, target))
895 if state.route.default:
896 raw_transitions.append(("_", state.route.default))
897 # Group by target, preserving first-seen order
898 target_labels: dict[str, list[str]] = {}
899 seen_targets: list[str] = []
900 for label, target in raw_transitions:
901 if target not in target_labels:
902 target_labels[target] = []
903 seen_targets.append(target)
904 target_labels[target].append(label)
905 transitions = [
906 f"{_colorize_label('/'.join(target_labels[t]))} \u2500\u2500\u2192 {t}"
907 for t in seen_targets
908 ]
909 if transitions:
910 print(" Transitions:")
911 for t in transitions:
912 print(f" {t}")
914 # --- Commands ---
915 print()
916 print("Commands:")
917 if fsm.commands:
918 cmds = [(e.cmd, e.comment) for e in fsm.commands]
919 else:
920 cmds = [
921 (f"ll-loop run {loop_name}", "run"),
922 (f"ll-loop test {loop_name}", "single test iteration"),
923 (f"ll-loop stop {loop_name}", "stop a running loop"),
924 (f"ll-loop status {loop_name}", "check if running"),
925 (f"ll-loop history {loop_name}", "execution history"),
926 ]
927 col_width = max(len(c) for c, _ in cmds) + 2
928 for cmd, comment in cmds:
929 print(f" {cmd:<{col_width}} # {comment}")
931 return 0
934def cmd_fragments(
935 lib_path: str,
936 args: argparse.Namespace,
937 loops_dir: Path,
938 logger: Logger,
939) -> int:
940 """List fragments in a library file with their descriptions.
942 Resolves the library path relative to loops_dir or the built-in loops directory,
943 then prints a table of fragment names and their description fields.
944 """
945 import yaml
947 # Resolve path: absolute/direct → loops_dir-relative → builtin
948 p = Path(lib_path)
949 if not p.exists():
950 candidate = loops_dir / lib_path
951 if candidate.exists():
952 p = candidate
953 else:
954 builtin_candidate = get_builtin_loops_dir() / lib_path
955 if builtin_candidate.exists():
956 p = builtin_candidate
957 else:
958 logger.error(f"Fragment library not found: {lib_path}")
959 return 1
961 try:
962 with open(p) as f:
963 data = yaml.safe_load(f) or {}
964 except Exception as e:
965 logger.error(f"Failed to load library: {e}")
966 return 1
968 fragments: dict[str, Any] = data.get("fragments", {})
969 if not fragments:
970 print(f"No fragments defined in: {p}")
971 return 0
973 tw = terminal_width()
974 print(colorize(f"Fragments in {p.name} ({len(fragments)}):", "1"))
975 print()
977 max_name_len = max(len(name) for name in fragments)
978 name_col_w = max(max_name_len, 8)
979 desc_col_w = max(20, tw - name_col_w - 6)
981 header_name = "Fragment".ljust(name_col_w)
982 print(f" {colorize(header_name, '1')} Description")
983 print(f" {'─' * name_col_w} {'─' * desc_col_w}")
985 for name, frag in fragments.items():
986 raw_desc = frag.get("description", "") if isinstance(frag, dict) else ""
987 raw_desc = raw_desc.strip()
988 first_line = raw_desc.splitlines()[0] if raw_desc else ""
989 if first_line and len(first_line) > desc_col_w:
990 first_line = first_line[: desc_col_w - 1] + "\u2026"
991 desc_str = first_line if first_line else colorize("(no description)", "2")
992 padded_name = name.ljust(name_col_w)
993 print(f" {colorize(padded_name, '36;1')} {desc_str}")
995 return 0