Coverage for little_loops / cli / loop / info.py: 79%

646 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-loop info subcommands: list, history, show.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import os 

7import re 

8from datetime import datetime 

9from pathlib import Path 

10from typing import Any 

11 

12from little_loops.cli.loop._helpers import ( 

13 get_builtin_loops_dir, 

14 load_loop_with_spec, 

15 resolve_loop_path, 

16) 

17from little_loops.cli.loop.layout import ( # noqa: F401 

18 _EDGE_LABEL_COLORS, 

19 _box_inner_lines, 

20 _colorize_diagram_labels, 

21 _colorize_label, 

22 _render_fsm_diagram, 

23) 

24from little_loops.cli.output import colorize, print_json, terminal_width 

25from little_loops.fsm.schema import FSMLoop, StateConfig 

26from little_loops.fsm.validation import load_and_validate 

27from little_loops.logger import Logger 

28 

29 

30def _load_loop_meta(path: Path) -> dict[str, Any]: 

31 """Return metadata from a loop YAML file (description, category, labels).""" 

32 import yaml 

33 

34 try: 

35 with open(path) as f: 

36 spec = yaml.safe_load(f) or {} 

37 desc_raw = spec.get("description", "") or "" 

38 if desc_raw.strip(): 

39 raw_lines = desc_raw.splitlines() 

40 desc = raw_lines[0] 

41 if len(raw_lines) > 1: 

42 desc += "…" 

43 else: 

44 desc = "" 

45 category = spec.get("category", "") or "" 

46 labels: list[str] = spec.get("labels", []) or [] 

47 return {"description": desc, "category": category, "labels": labels} 

48 except Exception: 

49 return {"description": "", "category": "", "labels": []} 

50 

51 

52def cmd_list( 

53 args: argparse.Namespace, 

54 loops_dir: Path, 

55) -> int: 

56 """List loops.""" 

57 status_filter = getattr(args, "status", None) 

58 if getattr(args, "running", False) or status_filter: 

59 from little_loops.fsm.persistence import list_running_loops 

60 

61 states = list_running_loops(loops_dir) 

62 if status_filter: 

63 states = [s for s in states if s.status == status_filter] 

64 if not states: 

65 if status_filter: 

66 print(f"No loops with status: {status_filter}") 

67 return 1 

68 print("No running loops") 

69 return 0 

70 if getattr(args, "json", False): 

71 print_json([s.to_dict() for s in states]) 

72 return 0 

73 print(colorize("Running loops:", "1")) 

74 _STATUS_COLORS = {"running": "32", "interrupted": "33", "stopped": "2", "starting": "33"} 

75 

76 # Group by loop_name to avoid duplicate rows for multi-instance loops 

77 from collections import defaultdict 

78 

79 groups: dict[str, list] = defaultdict(list) 

80 for state in states: 

81 groups[state.loop_name].append(state) 

82 

83 for loop_name_key, group_states in groups.items(): 

84 if len(group_states) == 1: 

85 state = group_states[0] 

86 elapsed_s = (state.accumulated_ms or 0) // 1000 

87 elapsed_str = ( 

88 f"{elapsed_s}s" if elapsed_s < 60 else f"{elapsed_s // 60}m {elapsed_s % 60}s" 

89 ) 

90 name_str = colorize(state.loop_name, "1") 

91 state_str = colorize(state.current_state, "34") 

92 status_color = _STATUS_COLORS.get(state.status, "2") 

93 display_status = "paused" if state.status == "interrupted" else state.status 

94 status_str = colorize(f"[{display_status}]", status_color) 

95 elapsed_colored = colorize(elapsed_str, "2") 

96 print( 

97 f" {name_str}: {state_str} (iteration {state.iteration})" 

98 f" {status_str} {elapsed_colored}" 

99 ) 

100 else: 

101 # Multiple instances: show a grouped summary 

102 name_str = colorize(loop_name_key, "1") 

103 statuses = ", ".join( 

104 colorize( 

105 f"[{'paused' if s.status == 'interrupted' else s.status}]", 

106 _STATUS_COLORS.get(s.status, "2"), 

107 ) 

108 for s in group_states 

109 ) 

110 count_str = colorize(f"({len(group_states)} instances)", "2") 

111 print(f" {name_str}: {count_str} {statuses}") 

112 return 0 

113 

114 builtin_only = getattr(args, "builtin", False) 

115 

116 # Collect project loops (skipped when --builtin is set) 

117 project_names: set[str] = set() 

118 yaml_files: list[Path] = [] 

119 if not builtin_only and loops_dir.exists(): 

120 yaml_files = sorted(loops_dir.glob("*.yaml")) 

121 project_names = {p.stem for p in yaml_files} 

122 

123 # Collect built-in loops (excluding those overridden by project) 

124 builtin_dir = get_builtin_loops_dir() 

125 builtin_files: list[Path] = [] 

126 if builtin_dir.exists(): 

127 builtin_files = [ 

128 f for f in sorted(builtin_dir.glob("*.yaml")) if f.stem not in project_names 

129 ] 

130 

131 if not yaml_files and not builtin_files: 

132 if getattr(args, "json", False): 

133 print_json([]) 

134 return 0 

135 print("No loops available") 

136 return 0 

137 

138 # Build combined metadata list 

139 all_loops: list[dict[str, Any]] = [] 

140 for path in yaml_files: 

141 meta = _load_loop_meta(path) 

142 all_loops.append({"name": path.stem, "path": path, "builtin": False, **meta}) 

143 for path in builtin_files: 

144 meta = _load_loop_meta(path) 

145 all_loops.append({"name": path.stem, "path": path, "builtin": True, **meta}) 

146 

147 # Apply --category filter 

148 category_filter = getattr(args, "category", None) 

149 if category_filter: 

150 all_loops = [lp for lp in all_loops if lp["category"] == category_filter] 

151 

152 # Apply --label filter (action="append" → list or None) 

153 label_filters: list[str] = getattr(args, "label", None) or [] 

154 if label_filters: 

155 all_loops = [ 

156 lp 

157 for lp in all_loops 

158 if any(lf.lower() in [lb.lower() for lb in lp["labels"]] for lf in label_filters) 

159 ] 

160 

161 if not all_loops: 

162 if getattr(args, "json", False): 

163 print_json([]) 

164 return 0 

165 print("No loops match the given filters") 

166 return 0 

167 

168 if getattr(args, "json", False): 

169 items: list[dict[str, Any]] = [] 

170 for lp in all_loops: 

171 item: dict[str, Any] = { 

172 "name": lp["name"], 

173 "path": str(lp["path"]), 

174 "category": lp["category"], 

175 "labels": lp["labels"], 

176 } 

177 if lp["builtin"]: 

178 item["built_in"] = True 

179 items.append(item) 

180 print_json(items) 

181 return 0 

182 

183 # Human-readable: group by category 

184 buckets: dict[str, list[dict[str, Any]]] = {} 

185 for lp in all_loops: 

186 cat = lp["category"] or "uncategorized" 

187 if cat not in buckets: 

188 buckets[cat] = [] 

189 buckets[cat].append(lp) 

190 

191 # Sort categories; "uncategorized" always last 

192 sorted_cats = sorted(c for c in buckets if c != "uncategorized") 

193 if "uncategorized" in buckets: 

194 sorted_cats.append("uncategorized") 

195 

196 # Compute max name width for column alignment 

197 max_name_len = max((len(lp["name"]) for lp in all_loops), default=0) 

198 name_col = max_name_len + 2 # padding after longest name 

199 tw = terminal_width() 

200 

201 cats_printed = False 

202 for cat in sorted_cats: 

203 group = buckets[cat] 

204 if cats_printed: 

205 print() # blank line between category groups 

206 cats_printed = True 

207 print(colorize(f"{cat} ({len(group)}):", "1")) 

208 for lp in group: 

209 # Name: project loops get bold cyan, built-in loops get dimmer cyan 

210 name_color = "36" if lp["builtin"] else "36;1" 

211 name_str = colorize(lp["name"].ljust(name_col), name_color) 

212 

213 # Suffix: labels + [built-in] tag 

214 suffix_parts: list[str] = [] 

215 if lp["labels"]: 

216 for label in lp["labels"]: 

217 suffix_parts.append(colorize(f"[{label}]", "2")) 

218 if lp["builtin"]: 

219 suffix_parts.append(colorize("[built-in]", "2")) 

220 

221 if suffix_parts: 

222 suffix_raw = " " + " ".join(suffix_parts) 

223 suffix_visible = len(_strip_ansi(suffix_raw)) 

224 else: 

225 suffix_raw = "" 

226 suffix_visible = 0 

227 

228 # Available width for description: indent + name_col + " " + desc + suffix 

229 avail = tw - 2 - name_col - 2 - suffix_visible 

230 desc_text = lp["description"] or "" 

231 if desc_text and avail < len(desc_text): 

232 desc_text = _truncate(desc_text, max(avail, 20)) 

233 desc_str = f" {colorize(desc_text, '2')}" if desc_text else "" 

234 

235 print(f" {name_str}{desc_str}{suffix_raw}") 

236 return 0 

237 

238 

239_EVENT_TYPE_WIDTH = 16 # width of "handoff_detected" 

240 

241_ANSI_RE = re.compile(r"\033\[[0-9;]*m") 

242 

243 

244def _strip_ansi(text: str) -> str: 

245 return _ANSI_RE.sub("", text) 

246 

247 

248def _truncate(text: str, max_len: int) -> str: 

249 """Truncate text to max_len with ellipsis.""" 

250 if max_len < 1: 

251 return "" 

252 if len(text) <= max_len: 

253 return text 

254 return text[: max_len - 1] + "\u2026" 

255 

256 

257def _format_history_event( 

258 event: dict[str, Any], verbose: bool, width: int, full: bool = False 

259) -> str | None: 

260 """Format a single history event. Returns None to skip the event.""" 

261 raw_ts = event.get("ts", "") 

262 try: 

263 ts = datetime.fromisoformat(raw_ts).strftime("%H:%M:%S") 

264 except (ValueError, TypeError): 

265 ts = raw_ts[:8] if len(raw_ts) >= 8 else raw_ts.ljust(8) 

266 

267 event_type = event.get("event", "unknown") 

268 

269 if event_type == "action_output" and not verbose: 

270 return None 

271 

272 ts_str = colorize(ts, "2") 

273 etype_padded = event_type.ljust(_EVENT_TYPE_WIDTH) 

274 etype_color = "0" 

275 detail = "" 

276 extra_lines: list[str] = [] 

277 

278 # Indentation prefix for verbose sub-lines (aligns under event detail column) 

279 _indent = " " * (8 + 2 + _EVENT_TYPE_WIDTH + 2) 

280 

281 if event_type == "loop_start": 

282 etype_color = "1" 

283 detail = event.get("loop", "") 

284 

285 elif event_type == "loop_complete": 

286 etype_color = "1" 

287 final_state = event.get("final_state", "") 

288 iterations = event.get("iterations", "") 

289 terminated_by = event.get("terminated_by", "") 

290 detail = f"{final_state} {iterations} iter [{terminated_by}]" 

291 

292 elif event_type == "loop_resume": 

293 etype_color = "1" 

294 from_state = event.get("from_state", "") 

295 iteration = event.get("iteration", "") 

296 detail = f"from={from_state} iter={iteration}" 

297 

298 elif event_type == "state_enter": 

299 etype_color = "34" 

300 state = event.get("state", "") 

301 iteration = event.get("iteration", "") 

302 detail = f"{colorize(state, '1')} (iter {iteration})" 

303 

304 elif event_type == "action_start": 

305 action = event.get("action", "") 

306 is_prompt = event.get("is_prompt", False) 

307 kind_label = "prompt" if is_prompt else "shell" 

308 kind_str = colorize(f"[{kind_label}]", "2") 

309 first_line = ( 

310 next((ln.strip() for ln in action.splitlines() if ln.strip()), "") 

311 if is_prompt 

312 else action 

313 ) 

314 avail = width - 8 - 2 - _EVENT_TYPE_WIDTH - 2 - len(kind_label) - 2 - 2 

315 detail = f"{_truncate(first_line, max(avail, 20))} {kind_str}" 

316 

317 elif event_type == "action_output": 

318 # Only reached in verbose mode 

319 etype_color = "2" 

320 detail = colorize("\u2502 " + event.get("line", ""), "2") 

321 

322 elif event_type == "action_complete": 

323 exit_code = event.get("exit_code", 0) 

324 duration_ms = event.get("duration_ms", 0) 

325 if exit_code == 0: 

326 etype_color = "2" 

327 status_str = colorize("\u2713", "32") 

328 else: 

329 etype_color = "38;5;208" 

330 status_str = colorize(f"\u2717 exit={exit_code}", "38;5;208") 

331 detail = f"{status_str} {duration_ms}ms" 

332 is_prompt = event.get("is_prompt", False) 

333 session_jsonl = event.get("session_jsonl") if is_prompt else None 

334 if session_jsonl: 

335 session_display = session_jsonl if verbose else os.path.basename(session_jsonl) 

336 detail += f" session={colorize(session_display, '2')}" 

337 if verbose: 

338 output_preview = event.get("output_preview", "") 

339 if output_preview: 

340 avail_w = width - len(_indent) - 2 

341 preview_text = ( 

342 output_preview if full else _truncate(output_preview, max(avail_w, 40)) 

343 ) 

344 for preview_line in preview_text.splitlines()[:5]: 

345 extra_lines.append(colorize(_indent + "\u2502 " + preview_line, "2")) 

346 

347 elif event_type == "evaluate": 

348 verdict = event.get("verdict", "") 

349 confidence = event.get("confidence", "") 

350 reason = event.get("reason", "") 

351 if verdict == "yes": 

352 etype_color = "32" 

353 verdict_str = colorize("\u2713 yes", "32") 

354 else: 

355 etype_color = "38;5;208" 

356 verdict_str = colorize(f"\u2717 {verdict}", "38;5;208") 

357 conf_part = f" confidence={confidence}" if confidence != "" else "" 

358 avail = width - 8 - 2 - _EVENT_TYPE_WIDTH - 2 - len("\u2713 yes") - len(conf_part) - 2 

359 reason_part = f" {_truncate(reason, max(avail, 20))}" if reason else "" 

360 detail = f"{verdict_str}{conf_part}{reason_part}" 

361 if verbose: 

362 llm_model = event.get("llm_model", "") 

363 llm_latency_ms = event.get("llm_latency_ms", "") 

364 llm_prompt = event.get("llm_prompt", "") 

365 llm_raw_output = event.get("llm_raw_output", "") 

366 if llm_model or llm_prompt: 

367 meta_parts = [] 

368 if llm_model: 

369 meta_parts.append(f"model={llm_model}") 

370 if llm_latency_ms != "": 

371 meta_parts.append(f"latency={llm_latency_ms}ms") 

372 meta_str = " ".join(meta_parts) 

373 extra_lines.append( 

374 colorize(_indent + colorize("LLM Call", "2") + " " + meta_str, "2") 

375 ) 

376 avail_w = width - len(_indent) - len("Prompt: ") - 2 

377 if llm_prompt: 

378 prompt_text = llm_prompt if full else _truncate(llm_prompt, max(avail_w, 40)) 

379 extra_lines.append(colorize(_indent + "Prompt: " + prompt_text, "2")) 

380 if llm_raw_output: 

381 resp_text = ( 

382 llm_raw_output if full else _truncate(llm_raw_output, max(avail_w, 40)) 

383 ) 

384 extra_lines.append(colorize(_indent + "Response: " + resp_text, "2")) 

385 

386 elif event_type == "route": 

387 etype_color = "2" 

388 from_state = event.get("from", "") 

389 to_state = event.get("to", "") 

390 detail = f"{from_state} \u2192 {colorize(to_state, '34')}" 

391 

392 elif event_type == "handoff_detected": 

393 etype_color = "33" 

394 detail = f"state={event.get('state', '')} iter={event.get('iteration', '')}" 

395 

396 else: 

397 details = {k: v for k, v in event.items() if k not in ("event", "ts")} 

398 detail = " ".join(f"{k}={v}" for k, v in details.items()) 

399 

400 etype_str = colorize(etype_padded, etype_color) 

401 main_line = f"{ts_str} {etype_str} {detail}" 

402 if extra_lines: 

403 return "\n".join([main_line] + extra_lines) 

404 return main_line 

405 

406 

407def _format_duration(ms: int) -> str: 

408 """Format milliseconds as a human-readable duration.""" 

409 if ms < 1000: 

410 return f"{ms}ms" 

411 s = ms // 1000 

412 if s < 60: 

413 return f"{s}s" 

414 m, s = divmod(s, 60) 

415 if m < 60: 

416 return f"{m}m{s:02d}s" 

417 h, m = divmod(m, 60) 

418 return f"{h}h{m:02d}m{s:02d}s" 

419 

420 

421def _list_archived_runs(loop_name: str, loops_dir: Path, as_json: bool) -> int: 

422 """List archived runs for a loop.""" 

423 import json as _json 

424 

425 from little_loops.fsm.persistence import HISTORY_DIR, LoopState 

426 

427 history_base = loops_dir / HISTORY_DIR 

428 if not history_base.exists(): 

429 print(f"No history for: {loop_name}") 

430 return 0 

431 

432 # Flat layout: run dirs are <run_id>-<loop_name> directly under .history/ 

433 suffix = f"-{loop_name}" 

434 runs: list[tuple[str, LoopState | None]] = [] 

435 for run_dir in sorted(history_base.iterdir(), key=lambda d: d.name, reverse=True): 

436 if not run_dir.is_dir() or not run_dir.name.endswith(suffix): 

437 continue 

438 run_id = run_dir.name[: -len(suffix)] 

439 state_file = run_dir / "state.json" 

440 state: LoopState | None = None 

441 if state_file.exists(): 

442 try: 

443 data = _json.loads(state_file.read_text()) 

444 state = LoopState.from_dict(data) 

445 except (ValueError, KeyError): 

446 pass 

447 runs.append((run_id, state)) 

448 

449 if not runs: 

450 print(f"No history for: {loop_name}") 

451 return 0 

452 

453 if as_json: 

454 print( 

455 _json.dumps( 

456 [ 

457 { 

458 "run_id": rid, 

459 "status": s.status if s else None, 

460 "started_at": s.started_at if s else None, 

461 "iterations": s.iteration if s else None, 

462 "duration_ms": s.accumulated_ms if s else None, 

463 } 

464 for rid, s in runs 

465 ], 

466 indent=2, 

467 ) 

468 ) 

469 return 0 

470 

471 status_colors = { 

472 "completed": "\033[32m", 

473 "failed": "\033[31m", 

474 "interrupted": "\033[33m", 

475 "awaiting_continuation": "\033[36m", 

476 "timed_out": "\033[33m", 

477 "running": "\033[34m", 

478 } 

479 reset = "\033[0m" 

480 

481 print(f"Archived runs for: {loop_name} ({len(runs)} total)") 

482 print() 

483 

484 for run_id, state in runs: 

485 if state is not None: 

486 color = status_colors.get(state.status, "") 

487 status_str = f"{color}{state.status}{reset}" 

488 duration_str = _format_duration(state.accumulated_ms) if state.accumulated_ms else "?" 

489 started = state.started_at[:19].replace("T", " ") if state.started_at else "?" 

490 iters = f"{state.iteration} iters" 

491 else: 

492 status_str = "unknown" 

493 duration_str = "?" 

494 started = "?" 

495 iters = "?" 

496 print(f" {run_id} {status_str} {started} {iters} {duration_str}") 

497 

498 print() 

499 print(f"To view events: ll-loop history {loop_name} <run-id>") 

500 return 0 

501 

502 

503def cmd_history( 

504 loop_name: str, 

505 run_id: str | None, 

506 args: argparse.Namespace, 

507 loops_dir: Path, 

508) -> int: 

509 """Show loop history. 

510 

511 Without run_id: lists all archived runs with status and duration. 

512 With run_id: shows events for that specific archived run. 

513 """ 

514 tail = getattr(args, "tail", 50) 

515 full = getattr(args, "full", False) 

516 verbose = getattr(args, "verbose", False) or full 

517 as_json = getattr(args, "json", False) 

518 

519 if run_id is None: 

520 return _list_archived_runs(loop_name, loops_dir, as_json) 

521 

522 # Show events for a specific archived run 

523 from little_loops.fsm.persistence import get_archived_events 

524 

525 events = get_archived_events(loop_name, run_id, loops_dir) 

526 

527 if not events: 

528 print(f"No events found for run {run_id} of loop {loop_name}") 

529 return 1 

530 

531 w = terminal_width() 

532 if not verbose: 

533 events = [e for e in events if e.get("event") != "action_output"] 

534 

535 # Apply optional filters (before --tail slice) 

536 event_filter = getattr(args, "event", None) 

537 if event_filter: 

538 events = [e for e in events if e.get("event") == event_filter] 

539 

540 state_filter = getattr(args, "state", None) 

541 if state_filter: 

542 events = [ 

543 e 

544 for e in events 

545 if ( 

546 e.get("state") == state_filter 

547 or e.get("from") == state_filter 

548 or e.get("to") == state_filter 

549 ) 

550 ] 

551 

552 since_str = getattr(args, "since", None) 

553 if since_str: 

554 from datetime import timedelta 

555 

556 from little_loops.text_utils import parse_duration 

557 

558 cutoff = datetime.now() - timedelta(seconds=parse_duration(since_str)) 

559 events = [ 

560 e 

561 for e in events 

562 if datetime.fromisoformat(e["ts"].replace("Z", "+00:00")).replace(tzinfo=None) >= cutoff 

563 ] 

564 

565 if as_json: 

566 print_json(events[-tail:]) 

567 return 0 

568 for event in events[-tail:]: 

569 line = _format_history_event(event, verbose, w, full=full) 

570 if line is not None: 

571 print(line) 

572 

573 return 0 

574 

575 

576# --------------------------------------------------------------------------- 

577# FSM diagram renderer — delegated to layout module (re-exported above) 

578# --------------------------------------------------------------------------- 

579 

580 

581# --------------------------------------------------------------------------- 

582# State overview table 

583# --------------------------------------------------------------------------- 

584 

585 

586def _compact_transitions(state: StateConfig) -> str: 

587 """Return a compact transition string for the overview table.""" 

588 raw: list[tuple[str, str]] = [] 

589 for label, target in [ 

590 ("yes", state.on_yes), 

591 ("no", state.on_no), 

592 ("error", state.on_error), 

593 ("partial", state.on_partial), 

594 ("next", state.next), 

595 ]: 

596 if target: 

597 raw.append((label, target)) 

598 if state.route: 

599 for verdict, target in state.route.routes.items(): 

600 raw.append((verdict, target)) 

601 if state.route.default: 

602 raw.append(("_", state.route.default)) 

603 if not raw: 

604 return "\u2014" 

605 # Group by target, preserving first-seen order 

606 seen: list[str] = [] 

607 by_target: dict[str, list[str]] = {} 

608 for label, target in raw: 

609 if target not in by_target: 

610 seen.append(target) 

611 by_target[target] = [] 

612 by_target[target].append(label) 

613 return ", ".join(f"{'/'.join(by_target[t])}\u2192{t}" for t in seen) 

614 

615 

616def _print_state_overview_table(fsm: FSMLoop) -> None: 

617 """Print a compact summary table of all states.""" 

618 rows: list[tuple[str, str, str, str]] = [] 

619 for name, state in fsm.states.items(): 

620 # State name column 

621 state_col = f"\u2192 {name}" if name == fsm.initial else f" {name}" 

622 

623 # Type column 

624 if state.terminal: 

625 type_col = "\u2014" 

626 elif state.action_type: 

627 type_col = state.action_type 

628 elif state.action: 

629 type_col = "shell" 

630 else: 

631 type_col = "\u2014" 

632 

633 # Action preview column 

634 if state.terminal: 

635 action_col = "(terminal)" 

636 elif state.action: 

637 src_lines = [ln.rstrip() for ln in state.action.strip().splitlines() if ln.rstrip()] 

638 action_col = src_lines[0] if src_lines else "\u2014" 

639 else: 

640 action_col = "\u2014" 

641 

642 # Transitions column 

643 trans_col = _compact_transitions(state) 

644 rows.append((state_col, type_col, action_col, trans_col)) 

645 

646 if not rows: 

647 return 

648 

649 tw = terminal_width() 

650 headers = ("State", "Type", "Action Preview", "Transitions") 

651 col0_w = max(len(headers[0]), max(len(r[0]) for r in rows)) 

652 col1_w = max(len(headers[1]), max(len(r[1]) for r in rows)) 

653 # Remaining width split between action preview and transitions 

654 fixed = col0_w + col1_w + 10 # margins + separators 

655 remaining = max(20, tw - fixed) 

656 col2_w = min(50, max(len(headers[2]), max(len(r[2]) for r in rows)), remaining * 3 // 5) 

657 col2_w = max(10, col2_w) 

658 col3_w = max(10, remaining - col2_w) 

659 

660 print(f" {headers[0]:<{col0_w}} {headers[1]:<{col1_w}} {headers[2]:<{col2_w}} {headers[3]}") 

661 dash = "\u2500" 

662 print(f" {dash * col0_w} {dash * col1_w} {dash * col2_w} {dash * col3_w}") 

663 for state_col, type_col, action_col, trans_col in rows: 

664 if len(action_col) > col2_w: 

665 action_col = action_col[: col2_w - 1] + "\u2026" 

666 if len(trans_col) > col3_w: 

667 trans_col = trans_col[: col3_w - 1] + "\u2026" 

668 colored_type = colorize(type_col, "2") if type_col == "\u2014" else type_col 

669 print( 

670 f" {state_col:<{col0_w}} {colored_type:<{col1_w}} " 

671 f"{action_col:<{col2_w}} {trans_col}" 

672 ) 

673 

674 

675# --------------------------------------------------------------------------- 

676# cmd_show 

677# --------------------------------------------------------------------------- 

678 

679_EVALUATE_TYPE_DISPLAY: dict[str, str] = { 

680 "llm": "LLM", 

681 "llm_structured": "LLM (structured)", 

682 "exit_code": "exit code", 

683 "output_numeric": "numeric", 

684 "output_contains": "contains", 

685 "output_json": "JSON", 

686 "convergence": "convergence", 

687} 

688 

689 

690def _humanize_evaluate_type(ev_type: str) -> str: 

691 return _EVALUATE_TYPE_DISPLAY.get(ev_type, ev_type) 

692 

693 

694def cmd_show( 

695 loop_name: str, 

696 args: argparse.Namespace, 

697 loops_dir: Path, 

698 logger: Logger, 

699) -> int: 

700 """Show loop details and structure.""" 

701 try: 

702 path = resolve_loop_path(loop_name, loops_dir) 

703 fsm, spec = load_loop_with_spec(loop_name, loops_dir, logger) 

704 except FileNotFoundError as e: 

705 logger.error(str(e)) 

706 return 1 

707 except ValueError as e: 

708 logger.error(f"Invalid loop: {e}") 

709 return 1 

710 

711 if getattr(args, "json", False): 

712 data = fsm.to_dict() 

713 if getattr(args, "resolved", False): 

714 for state in data.get("states", {}).values(): 

715 if "loop" in state: 

716 try: 

717 child_path = resolve_loop_path(state["loop"], loops_dir) 

718 child_fsm, _ = load_and_validate(child_path) 

719 state["_subloop"] = child_fsm.to_dict().get("states", {}) 

720 except (FileNotFoundError, ValueError): 

721 pass 

722 print_json(data) 

723 return 0 

724 

725 tw = terminal_width() 

726 

727 # Compute stats for header 

728 n_states = len(fsm.states) 

729 n_transitions = sum( 

730 bool(s.on_yes) 

731 + bool(s.on_no) 

732 + bool(s.on_error) 

733 + bool(s.on_partial) 

734 + bool(s.next) 

735 + bool(s.on_maintain) 

736 + (len(s.route.routes) + bool(s.route.default) if s.route else 0) 

737 for s in fsm.states.values() 

738 ) 

739 

740 # --- Compact metadata header --- 

741 # Line 1: ── name ───────── N states · M transitions ── 

742 stats_parts: list[str] = [] 

743 stats_parts.append(f"{n_states} states") 

744 stats_parts.append(f"{n_transitions} transitions") 

745 stats_str = " \u00b7 ".join(stats_parts) 

746 

747 header_left = f"\u2500\u2500 {loop_name} " 

748 header_right = f" {stats_str} \u2500\u2500" 

749 dashes = "\u2500" * max(0, tw - len(header_left) - len(header_right)) 

750 print(f"{header_left}{dashes}{header_right}") 

751 

752 # Line 2: source · max: N iter · handoff: X [· optional fields] 

753 config_parts: list[str] = [str(path), f"max: {fsm.max_iterations} iter"] 

754 config_parts.append(f"handoff: {fsm.on_handoff}") 

755 if fsm.timeout: 

756 config_parts.append(f"timeout: {fsm.timeout}s") 

757 if fsm.backoff: 

758 config_parts.append(f"backoff: {fsm.backoff}s") 

759 if fsm.maintain: 

760 config_parts.append("maintain: yes") 

761 if fsm.context: 

762 config_parts.append(f"context: {', '.join(fsm.context.keys())}") 

763 if fsm.scope: 

764 config_parts.append(f"scope: {', '.join(fsm.scope)}") 

765 llm = fsm.llm 

766 llm_parts = [] 

767 if llm.model != "sonnet": 

768 llm_parts.append(f"model={llm.model}") 

769 if llm.max_tokens != 256: 

770 llm_parts.append(f"max_tokens={llm.max_tokens}") 

771 if llm.timeout != 30: 

772 llm_parts.append(f"timeout={llm.timeout}s") 

773 if llm_parts: 

774 config_parts.append(f"llm: {', '.join(llm_parts)}") 

775 if fsm.config is not None: 

776 cfg_parts = [] 

777 if fsm.config.handoff_threshold is not None: 

778 cfg_parts.append(f"handoff_threshold={fsm.config.handoff_threshold}") 

779 if fsm.config.readiness_threshold is not None: 

780 cfg_parts.append(f"readiness_threshold={fsm.config.readiness_threshold}") 

781 if fsm.config.outcome_threshold is not None: 

782 cfg_parts.append(f"outcome_threshold={fsm.config.outcome_threshold}") 

783 if fsm.config.max_continuations is not None: 

784 cfg_parts.append(f"max_continuations={fsm.config.max_continuations}") 

785 if cfg_parts: 

786 config_parts.append(f"config: {', '.join(cfg_parts)}") 

787 imports = spec.get("import", []) 

788 if imports: 

789 config_parts.append(f"imports: {', '.join(imports)}") 

790 print(" " + " \u00b7 ".join(config_parts)) 

791 

792 # --- Description --- 

793 description = spec.get("description", "").strip() 

794 if description: 

795 print() 

796 print("Description:") 

797 for line in description.splitlines(): 

798 print(f" {line}") 

799 

800 # --- ASCII FSM Diagram --- 

801 verbose = getattr(args, "verbose", False) 

802 from pathlib import Path 

803 

804 from little_loops.config import BRConfig 

805 

806 badges = BRConfig(Path.cwd()).loops.glyphs.to_dict() 

807 print() 

808 print("Diagram:") 

809 diagram = _render_fsm_diagram(fsm, verbose=verbose, badges=badges) 

810 if diagram: 

811 print(diagram) 

812 

813 # --- State overview table --- 

814 print() 

815 _print_state_overview_table(fsm) 

816 

817 # --- States & Transitions (verbose only) --- 

818 if verbose: 

819 print() 

820 print("States:") 

821 first_state = True 

822 for name, state in fsm.states.items(): 

823 if not first_state: 

824 print() 

825 first_state = False 

826 

827 # Improved state section header: ── name ──── MARKERS · type ── 

828 right_parts = [] 

829 if name == fsm.initial: 

830 right_parts.append("INITIAL") 

831 if state.terminal: 

832 right_parts.append("TERMINAL") 

833 if state.action_type: 

834 right_parts.append(state.action_type) 

835 right_info = " \u00b7 ".join(right_parts) 

836 inner_left = f"\u2500\u2500 {name} " 

837 inner_right = f" {right_info} \u2500\u2500" if right_info else " \u2500\u2500" 

838 fill = "\u2500" * max(0, tw - 2 - len(inner_left) - len(inner_right)) 

839 print(f" {inner_left}{fill}{inner_right}") 

840 

841 if state.action: 

842 if verbose: 

843 indented = "\n ".join(state.action.strip().splitlines()) 

844 print(f" action:\n {indented}") 

845 elif state.action_type == "prompt": 

846 lines_act = state.action.strip().splitlines() 

847 preview = "\n ".join(lines_act[:3]) 

848 if len(lines_act) > 3 or len(state.action) > 200: 

849 preview += " ..." 

850 print(f" action:\n {preview}") 

851 else: # shell, slash_command, or None 

852 action_display = ( 

853 state.action[:70] + "..." if len(state.action) > 70 else state.action 

854 ) 

855 print(f" action: {action_display}") 

856 if state.evaluate: 

857 ev = state.evaluate 

858 print(f" evaluate: {_humanize_evaluate_type(ev.type)}") 

859 if ev.prompt: 

860 if verbose: 

861 print(" prompt:") 

862 for line in ev.prompt.strip().splitlines(): 

863 print(f" \u2502 {line}") 

864 else: 

865 ev_lines = ev.prompt.strip().splitlines() 

866 preview = ev_lines[0][:100] + ( 

867 " ..." if len(ev_lines) > 1 or len(ev_lines[0]) > 100 else "" 

868 ) 

869 print(f" prompt: {preview}") 

870 if ev.min_confidence != 0.5: 

871 print(f" min_confidence: {ev.min_confidence}") 

872 if ev.operator: 

873 print(f" operator: {ev.operator} {ev.target}") 

874 if ev.pattern: 

875 print(f" pattern: {ev.pattern}") 

876 if state.capture: 

877 print(f" capture: {state.capture}") 

878 if state.timeout: 

879 print(f" timeout: {state.timeout}s") 

880 # Collect (label, target) pairs 

881 raw_transitions: list[tuple[str, str]] = [] 

882 for label, target in [ 

883 ("yes", state.on_yes), 

884 ("no", state.on_no), 

885 ("error", state.on_error), 

886 ("partial", state.on_partial), 

887 ("next", state.next), 

888 ("maintain", state.on_maintain), 

889 ]: 

890 if target: 

891 raw_transitions.append((label, target)) 

892 if state.route: 

893 for verdict, target in state.route.routes.items(): 

894 raw_transitions.append((verdict, target)) 

895 if state.route.default: 

896 raw_transitions.append(("_", state.route.default)) 

897 # Group by target, preserving first-seen order 

898 target_labels: dict[str, list[str]] = {} 

899 seen_targets: list[str] = [] 

900 for label, target in raw_transitions: 

901 if target not in target_labels: 

902 target_labels[target] = [] 

903 seen_targets.append(target) 

904 target_labels[target].append(label) 

905 transitions = [ 

906 f"{_colorize_label('/'.join(target_labels[t]))} \u2500\u2500\u2192 {t}" 

907 for t in seen_targets 

908 ] 

909 if transitions: 

910 print(" Transitions:") 

911 for t in transitions: 

912 print(f" {t}") 

913 

914 # --- Commands --- 

915 print() 

916 print("Commands:") 

917 if fsm.commands: 

918 cmds = [(e.cmd, e.comment) for e in fsm.commands] 

919 else: 

920 cmds = [ 

921 (f"ll-loop run {loop_name}", "run"), 

922 (f"ll-loop test {loop_name}", "single test iteration"), 

923 (f"ll-loop stop {loop_name}", "stop a running loop"), 

924 (f"ll-loop status {loop_name}", "check if running"), 

925 (f"ll-loop history {loop_name}", "execution history"), 

926 ] 

927 col_width = max(len(c) for c, _ in cmds) + 2 

928 for cmd, comment in cmds: 

929 print(f" {cmd:<{col_width}} # {comment}") 

930 

931 return 0 

932 

933 

934def cmd_fragments( 

935 lib_path: str, 

936 args: argparse.Namespace, 

937 loops_dir: Path, 

938 logger: Logger, 

939) -> int: 

940 """List fragments in a library file with their descriptions. 

941 

942 Resolves the library path relative to loops_dir or the built-in loops directory, 

943 then prints a table of fragment names and their description fields. 

944 """ 

945 import yaml 

946 

947 # Resolve path: absolute/direct → loops_dir-relative → builtin 

948 p = Path(lib_path) 

949 if not p.exists(): 

950 candidate = loops_dir / lib_path 

951 if candidate.exists(): 

952 p = candidate 

953 else: 

954 builtin_candidate = get_builtin_loops_dir() / lib_path 

955 if builtin_candidate.exists(): 

956 p = builtin_candidate 

957 else: 

958 logger.error(f"Fragment library not found: {lib_path}") 

959 return 1 

960 

961 try: 

962 with open(p) as f: 

963 data = yaml.safe_load(f) or {} 

964 except Exception as e: 

965 logger.error(f"Failed to load library: {e}") 

966 return 1 

967 

968 fragments: dict[str, Any] = data.get("fragments", {}) 

969 if not fragments: 

970 print(f"No fragments defined in: {p}") 

971 return 0 

972 

973 tw = terminal_width() 

974 print(colorize(f"Fragments in {p.name} ({len(fragments)}):", "1")) 

975 print() 

976 

977 max_name_len = max(len(name) for name in fragments) 

978 name_col_w = max(max_name_len, 8) 

979 desc_col_w = max(20, tw - name_col_w - 6) 

980 

981 header_name = "Fragment".ljust(name_col_w) 

982 print(f" {colorize(header_name, '1')} Description") 

983 print(f" {'─' * name_col_w} {'─' * desc_col_w}") 

984 

985 for name, frag in fragments.items(): 

986 raw_desc = frag.get("description", "") if isinstance(frag, dict) else "" 

987 raw_desc = raw_desc.strip() 

988 first_line = raw_desc.splitlines()[0] if raw_desc else "" 

989 if first_line and len(first_line) > desc_col_w: 

990 first_line = first_line[: desc_col_w - 1] + "\u2026" 

991 desc_str = first_line if first_line else colorize("(no description)", "2") 

992 padded_name = name.ljust(name_col_w) 

993 print(f" {colorize(padded_name, '36;1')} {desc_str}") 

994 

995 return 0