Coverage for little_loops / cli / loop / _helpers.py: 96%

366 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Shared helpers for ll-loop CLI subcommands.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import json 

7import signal 

8import subprocess 

9import sys 

10import time 

11from datetime import datetime 

12from pathlib import Path 

13from types import FrameType 

14from typing import TYPE_CHECKING, Any 

15 

16from little_loops.cli.output import colorize, terminal_width 

17from little_loops.fsm.concurrency import _process_alive 

18from little_loops.logger import Logger 

19 

20if TYPE_CHECKING: 

21 from little_loops.fsm.schema import FSMLoop 

22 

23# Exit code mapping for terminated_by values 

24EXIT_CODES: dict[str, int] = { 

25 "terminal": 0, 

26 "signal": 0, 

27 "handoff": 0, 

28 "max_iterations": 1, 

29 "timeout": 1, 

30 "cycle_detected": 1, 

31} 

32 

33# Module-level shutdown state for signal handling 

34_loop_shutdown_requested: bool = False 

35_loop_executor: Any = None 

36_loop_pid_file: Path | None = None 

37_using_alt_screen: bool = False 

38 

39 

40def _loop_signal_handler(signum: int, frame: FrameType | None) -> None: 

41 """Handle shutdown signals gracefully for ll-loop. 

42 

43 First signal: Set shutdown flag for graceful exit after current state. 

44 Second signal: Force immediate exit. 

45 """ 

46 global _loop_shutdown_requested, _using_alt_screen 

47 if _loop_shutdown_requested: 

48 # Second signal - force exit 

49 if _loop_pid_file is not None: 

50 _loop_pid_file.unlink(missing_ok=True) 

51 if _using_alt_screen: 

52 print("\033[?1049l", end="", file=sys.stderr, flush=True) 

53 print(colorize("\nForce shutdown requested", "38;5;208"), file=sys.stderr) 

54 sys.exit(1) 

55 _loop_shutdown_requested = True 

56 print(colorize("\nShutdown requested, will exit after current state...", "33"), file=sys.stderr) 

57 if _loop_executor is not None: 

58 _loop_executor.request_shutdown() 

59 # Kill any child subprocess currently blocking in the action runner 

60 inner = getattr(_loop_executor, "_executor", None) 

61 if inner is not None: 

62 runner = getattr(inner, "action_runner", None) 

63 if runner is not None: 

64 proc = getattr(runner, "_current_process", None) 

65 if proc is not None: 

66 proc.kill() 

67 # Also kill MCP subprocesses tracked directly on FSMExecutor (_run_subprocess path) 

68 fsm_proc = getattr(inner, "_current_process", None) 

69 if fsm_proc is not None: 

70 fsm_proc.kill() 

71 

72 

73def _is_earliest_waiter(entry_id: str, queue_dir: Path) -> bool: 

74 """Return True if entry_id is the earliest-enqueued waiter in queue_dir. 

75 

76 Returns True when this waiter is first or the queue is empty/unreadable, 

77 allowing it to proceed with acquire(). Non-first waiters return False and 

78 should back off to yield to the earlier waiter (ENH-1332). 

79 

80 Stale entries (dead PIDs) are removed on the fly so orphaned queue files 

81 from crashed processes do not block live waiters indefinitely (BUG-1360). 

82 """ 

83 if not queue_dir.exists(): 

84 return True 

85 entries: list[dict] = [] 

86 for f in queue_dir.glob("*.json"): 

87 try: 

88 with open(f) as fh: 

89 data = json.load(fh) 

90 pid = data.get("context", {}).get("pid") 

91 if pid is not None and not _process_alive(pid): 

92 f.unlink(missing_ok=True) 

93 continue 

94 entries.append(data) 

95 except (json.JSONDecodeError, KeyError, FileNotFoundError, OSError): 

96 continue 

97 if not entries: 

98 return True 

99 entries.sort(key=lambda d: d.get("enqueuedAt", "")) 

100 return entries[0].get("id") == entry_id 

101 

102 

103def register_loop_signal_handlers(executor: Any, pid_file: Path | None = None) -> None: 

104 """Register SIGINT/SIGTERM handlers for graceful loop shutdown. 

105 

106 Sets up signal handling so that Ctrl-C triggers a graceful shutdown 

107 (calls executor.request_shutdown()) rather than raising KeyboardInterrupt. 

108 A second Ctrl-C forces immediate exit with PID file cleanup. 

109 

110 Args: 

111 executor: The PersistentExecutor instance to request shutdown on. 

112 pid_file: Optional path to PID file to clean up on forced exit. 

113 """ 

114 global _loop_shutdown_requested, _loop_executor, _loop_pid_file 

115 _loop_shutdown_requested = False 

116 _loop_executor = executor 

117 _loop_pid_file = pid_file 

118 signal.signal(signal.SIGINT, _loop_signal_handler) 

119 signal.signal(signal.SIGTERM, _loop_signal_handler) 

120 

121 

122def get_builtin_loops_dir() -> Path: 

123 """Get the path to built-in loops bundled with the plugin.""" 

124 return Path(__file__).parent.parent.parent / "loops" 

125 

126 

127def resolve_loop_path(name_or_path: str, loops_dir: Path) -> Path: 

128 """Resolve loop name to file path.""" 

129 path = Path(name_or_path) 

130 if path.exists(): 

131 return path 

132 

133 # Try <loops_dir>/<name>.fsm.yaml first (compiled FSM) 

134 fsm_path = loops_dir / f"{name_or_path}.fsm.yaml" 

135 if fsm_path.exists(): 

136 return fsm_path 

137 

138 # Fall back to <loops_dir>/<name>.yaml 

139 loops_path = loops_dir / f"{name_or_path}.yaml" 

140 if loops_path.exists(): 

141 return loops_path 

142 

143 # Fall back to built-in loops from plugin directory 

144 builtin_path = get_builtin_loops_dir() / f"{name_or_path}.yaml" 

145 if builtin_path.exists(): 

146 return builtin_path 

147 

148 raise FileNotFoundError(f"Loop not found: {name_or_path}") 

149 

150 

151def load_loop(name_or_path: str, loops_dir: Path, logger: Logger) -> FSMLoop: 

152 """Load and validate a loop. 

153 

154 Raises: 

155 FileNotFoundError: If loop not found. 

156 ValueError: If loop is invalid. 

157 """ 

158 from little_loops.fsm.validation import load_and_validate 

159 

160 path = resolve_loop_path(name_or_path, loops_dir) 

161 fsm, _ = load_and_validate(path) 

162 return fsm 

163 

164 

165def load_loop_with_spec( 

166 name_or_path: str, loops_dir: Path, logger: Logger 

167) -> tuple[FSMLoop, dict[str, Any]]: 

168 """Load a loop and return both the FSMLoop and raw spec dict. 

169 

170 Used by commands that need access to raw YAML fields (e.g., description). 

171 

172 Raises: 

173 FileNotFoundError: If loop not found. 

174 ValueError: If loop is invalid. 

175 """ 

176 import yaml 

177 

178 from little_loops.fsm.validation import load_and_validate 

179 

180 path = resolve_loop_path(name_or_path, loops_dir) 

181 

182 with open(path) as f: 

183 spec = yaml.safe_load(f) 

184 

185 fsm, _ = load_and_validate(path) 

186 return fsm, spec 

187 

188 

189def print_execution_plan(fsm: FSMLoop, edge_label_colors: dict[str, str] | None = None) -> None: 

190 """Print dry-run execution plan.""" 

191 _elc = edge_label_colors or {} 

192 _yes_color = _elc.get("yes", "32") 

193 tw = terminal_width() 

194 print(colorize(f"Execution plan for: {fsm.name}", "1")) 

195 print() 

196 print("States:") 

197 for name, state in fsm.states.items(): 

198 terminal_marker = colorize(" [TERMINAL]", _yes_color) if state.terminal else "" 

199 print(f" {colorize(f'[{name}]', '1')}{terminal_marker}") 

200 if state.action: 

201 if state.action_type == "prompt": 

202 lines = state.action.strip().splitlines() 

203 preview = "\n ".join(lines[:3]) 

204 if len(lines) > 3 or len(state.action) > 200: 

205 preview += " ..." 

206 print(f" action: |\n {preview}") 

207 else: 

208 max_action = tw - 16 

209 action_display = ( 

210 state.action[:max_action] + "..." 

211 if len(state.action) > max_action 

212 else state.action 

213 ) 

214 print(f" action: {action_display}") 

215 if state.evaluate: 

216 print(f" evaluate: {state.evaluate.type}") 

217 if state.on_yes: 

218 print(f" on_yes {colorize('->', '2')} {colorize(state.on_yes, '2')}") 

219 if state.on_no: 

220 print(f" on_no {colorize('->', '2')} {colorize(state.on_no, '2')}") 

221 if state.on_error: 

222 print(f" on_error {colorize('->', '2')} {colorize(state.on_error, '2')}") 

223 if state.next: 

224 print(f" next {colorize('->', '2')} {colorize(state.next, '2')}") 

225 if state.route: 

226 print(" route:") 

227 for verdict, target in state.route.routes.items(): 

228 print(f" {verdict} {colorize('->', '2')} {colorize(target, '2')}") 

229 if state.route.default: 

230 print(f" _ {colorize('->', '2')} {colorize(state.route.default, '2')}") 

231 print() 

232 print(f"Initial state: {fsm.initial}") 

233 print(f"Max iterations: {fsm.max_iterations}") 

234 if fsm.timeout: 

235 print(f"Timeout: {fsm.timeout}s") 

236 if fsm.context: 

237 print("Context:") 

238 for key, value in fsm.context.items(): 

239 print(f" {key}: {value!r}") 

240 

241 

242def _make_instance_id(loop_name: str) -> str: 

243 """Generate a unique instance ID for a loop run.""" 

244 return f"{loop_name}-{datetime.now().strftime('%Y%m%dT%H%M%S')}" 

245 

246 

247def run_background( 

248 loop_name: str, args: argparse.Namespace, loops_dir: Path, subcommand: str = "run" 

249) -> int: 

250 """Launch loop as a detached background process. 

251 

252 Spawns a new process with start_new_session=True that re-executes 

253 the loop with --foreground-internal. The parent writes the PID file 

254 and returns immediately. 

255 

256 Args: 

257 subcommand: The ll-loop subcommand to spawn ("run" or "resume"). 

258 

259 Returns: 

260 Exit code (0 = launched successfully). 

261 """ 

262 running_dir = loops_dir / ".running" 

263 running_dir.mkdir(parents=True, exist_ok=True) 

264 

265 instance_id = _make_instance_id(loop_name) 

266 pid_file = running_dir / f"{instance_id}.pid" 

267 log_file = running_dir / f"{instance_id}.log" 

268 

269 # Build re-exec command with --foreground-internal instead of --background 

270 cmd = [ 

271 sys.executable, 

272 "-m", 

273 "little_loops.cli.loop", 

274 subcommand, 

275 loop_name, 

276 ] 

277 input_val = getattr(args, "input", None) 

278 if input_val is not None: 

279 cmd.append(input_val) 

280 cmd.append("--foreground-internal") 

281 cmd.extend(["--instance-id", instance_id]) 

282 

283 # Forward relevant args 

284 max_iter = getattr(args, "max_iterations", None) 

285 if max_iter: 

286 cmd.extend(["--max-iterations", str(max_iter)]) 

287 if getattr(args, "no_llm", False): 

288 cmd.append("--no-llm") 

289 llm_model = getattr(args, "llm_model", None) 

290 if llm_model: 

291 cmd.extend(["--llm-model", llm_model]) 

292 if getattr(args, "verbose", False): 

293 cmd.append("--verbose") 

294 if getattr(args, "show_diagrams", False): 

295 cmd.append("--show-diagrams") 

296 if getattr(args, "quiet", False): 

297 cmd.append("--quiet") 

298 if getattr(args, "queue", False): 

299 cmd.append("--queue") 

300 for kv in getattr(args, "context", None) or []: 

301 cmd.extend(["--context", kv]) 

302 program_md = getattr(args, "program_md", None) 

303 if program_md is not None: 

304 cmd.extend(["--program-md", str(program_md)]) 

305 delay = getattr(args, "delay", None) 

306 if delay is not None: 

307 cmd.extend(["--delay", str(delay)]) 

308 handoff_threshold = getattr(args, "handoff_threshold", None) 

309 if handoff_threshold is not None: 

310 cmd.extend(["--handoff-threshold", str(handoff_threshold)]) 

311 context_limit = getattr(args, "context_limit", None) 

312 if context_limit is not None: 

313 cmd.extend(["--context-limit", str(context_limit)]) 

314 

315 with open(log_file, "w") as log_fh: 

316 process = subprocess.Popen( 

317 cmd, 

318 start_new_session=True, 

319 stdout=log_fh, 

320 stderr=log_fh, 

321 stdin=subprocess.DEVNULL, 

322 ) 

323 

324 pid_file.write_text(str(process.pid)) 

325 print( 

326 f"Loop {colorize(loop_name, '1')} started in background (PID: {colorize(str(process.pid), '2')})" 

327 ) 

328 print(f" Log: {colorize(str(log_file), '2')}") 

329 print(f" Status: {colorize(f'll-loop status {loop_name}', '2')}") 

330 print(f" Stop: {colorize(f'll-loop stop {loop_name}', '2')}") 

331 return 0 

332 

333 

334def run_foreground( 

335 executor: Any, 

336 fsm: FSMLoop, 

337 args: argparse.Namespace, 

338 highlight_color: str = "32", 

339 edge_label_colors: dict[str, str] | None = None, 

340 badges: dict[str, str] | None = None, 

341) -> int: 

342 """Run loop with progress display. 

343 

344 Args: 

345 highlight_color: ANSI SGR code for the active FSM state highlight in verbose mode. 

346 edge_label_colors: Optional label→SGR-code mapping for transition edge labels. 

347 badges: Optional glyph-key→string mapping for state type badges in FSM diagrams. 

348 

349 Returns: 

350 Exit code (0 = success). 

351 """ 

352 quiet = getattr(args, "quiet", False) 

353 verbose = getattr(args, "verbose", False) 

354 show_diagrams = getattr(args, "show_diagrams", False) 

355 clear_screen = getattr(args, "clear", False) 

356 if not quiet: 

357 print(f"Running loop: {colorize(fsm.name, '1')}") 

358 print(f"Max iterations: {colorize(str(fsm.max_iterations), '2')}") 

359 print() 

360 

361 current_iteration = [0] # Use list to allow mutation in closure 

362 last_state_at_depth: dict[int, str] = {} # Track last known state per nesting depth 

363 child_fsm_stack: dict[int, FSMLoop | None] = {} # Active child FSM per depth 

364 loop_start_time = time.monotonic() 

365 

366 def display_progress(event: dict) -> None: 

367 """Display progress for events.""" 

368 event_type = event.get("event") 

369 depth = event.get("depth", 0) 

370 indent = " " * depth 

371 tw = terminal_width() 

372 max_line = tw - 8 - len(indent) 

373 

374 if event_type == "state_enter": 

375 current_iteration[0] = event.get("iteration", 0) 

376 state = event.get("state", "") 

377 if not quiet: 

378 elapsed_int = int(time.monotonic() - loop_start_time) 

379 if elapsed_int < 60: 

380 elapsed_str = f"{elapsed_int}s" 

381 else: 

382 elapsed_str = f"{elapsed_int // 60}m {elapsed_int % 60}s" 

383 if clear_screen and sys.stdout.isatty() and depth == 0: 

384 print("\033[2J\033[H", end="", flush=True) 

385 # Update last-known state at this depth and clear stale deeper entries 

386 last_state_at_depth[depth] = state 

387 for k in [k for k in last_state_at_depth if k > depth]: 

388 del last_state_at_depth[k] 

389 # Load child FSM for the current state at this depth 

390 parent_at_depth = fsm if depth == 0 else child_fsm_stack.get(depth - 1) 

391 if parent_at_depth is not None and state in parent_at_depth.states: 

392 fsm_state = parent_at_depth.states[state] 

393 if fsm_state.loop is not None: 

394 try: 

395 child_fsm_stack[depth] = load_loop( 

396 fsm_state.loop, executor.loops_dir, Logger() 

397 ) 

398 except (FileNotFoundError, ValueError): 

399 pass # leave child_fsm_stack[depth] unchanged on failure 

400 else: 

401 child_fsm_stack[depth] = None 

402 else: 

403 child_fsm_stack[depth] = None 

404 # Clear stale deeper child FSM entries 

405 for k in [k for k in child_fsm_stack if k > depth]: 

406 del child_fsm_stack[k] 

407 if show_diagrams: 

408 from little_loops.cli.loop.layout import _render_fsm_diagram 

409 

410 diagram = _render_fsm_diagram( 

411 fsm, 

412 highlight_state=last_state_at_depth.get(0), 

413 highlight_color=highlight_color, 

414 edge_label_colors=edge_label_colors, 

415 badges=badges, 

416 ) 

417 header_text = f"== loop: {fsm.name} " 

418 header = header_text + "=" * max(0, tw - len(header_text)) 

419 print(header, flush=True) 

420 print(diagram, flush=True) 

421 for d, child_fsm_at_d in sorted(child_fsm_stack.items()): 

422 if child_fsm_at_d is not None and (d + 1) in last_state_at_depth: 

423 child_name = child_fsm_at_d.name 

424 separator_text = f"\u2500\u2500 sub-loop: {child_name} " 

425 separator = separator_text + "\u2500" * max(0, tw - len(separator_text)) 

426 print(separator, flush=True) 

427 child_diagram = _render_fsm_diagram( 

428 child_fsm_at_d, 

429 highlight_state=last_state_at_depth.get(d + 1), 

430 highlight_color=highlight_color, 

431 edge_label_colors=edge_label_colors, 

432 badges=badges, 

433 ) 

434 print(child_diagram, flush=True) 

435 if not quiet: 

436 print( 

437 f"{indent}[{current_iteration[0]}/{fsm.max_iterations}] {colorize(state, '1')} ({colorize(elapsed_str, '2')})", 

438 end="", 

439 flush=True, 

440 ) 

441 

442 elif event_type == "action_start": 

443 if not quiet: 

444 action = event.get("action", "") 

445 is_prompt = event.get("is_prompt", False) 

446 if is_prompt: 

447 lines = action.strip().splitlines() 

448 line_count = len(lines) 

449 prompt_badge = "\u2726" # ✦ 

450 print( 

451 f"{indent} -> {colorize(prompt_badge, '2')} {colorize(f'({line_count} lines)', '2')}", 

452 flush=True, 

453 ) 

454 show_count = line_count if verbose else min(5, line_count) 

455 for line in lines[:show_count]: 

456 if verbose: 

457 print(f"{indent} {line}", flush=True) 

458 else: 

459 display = line[:max_line] + "..." if len(line) > max_line else line 

460 print(f"{indent} {display}", flush=True) 

461 if line_count > show_count: 

462 print( 

463 f"{indent} ... ({line_count - show_count} more lines)", flush=True 

464 ) 

465 else: 

466 if verbose: 

467 action_display = action 

468 else: 

469 action_display = ( 

470 action[:max_line] + "..." if len(action) > max_line else action 

471 ) 

472 print(f"{indent} -> {colorize(action_display, '2')}", flush=True) 

473 

474 elif event_type == "action_output": 

475 if not quiet: 

476 line = event.get("line", "") 

477 if line.strip(): 

478 print(f"{indent} {line}", flush=True) 

479 

480 elif event_type == "action_complete": 

481 if not quiet: 

482 duration_ms = event.get("duration_ms", 0) 

483 exit_code = event.get("exit_code", 0) 

484 duration_sec = duration_ms / 1000 

485 if duration_sec < 60: 

486 duration_str = f"{duration_sec:.1f}s" 

487 else: 

488 minutes = int(duration_sec // 60) 

489 seconds = duration_sec % 60 

490 duration_str = f"{minutes}m {seconds:.0f}s" 

491 parts = [f"{indent} ({colorize(duration_str, '2')})"] 

492 if exit_code == 124: 

493 parts.append(colorize("timed out", "38;5;208")) 

494 elif exit_code != 0: 

495 parts.append(colorize(f"exit: {exit_code}", "38;5;208")) 

496 print(" ".join(parts), flush=True) 

497 

498 elif event_type == "evaluate": 

499 if not quiet: 

500 verdict = event.get("verdict", "") 

501 confidence = event.get("confidence") 

502 reason = event.get("reason", "") 

503 error = event.get("error", "") 

504 _elc = edge_label_colors or {} 

505 if verdict in ("yes", "target", "progress"): 

506 _vc = _elc.get("yes", "32") 

507 symbol = colorize("\u2713", _vc) 

508 verdict_colored = colorize(verdict, _vc) 

509 elif verdict == "no": 

510 _vc = _elc.get("no", "38;5;208") 

511 symbol = colorize("\u2717", _vc) 

512 verdict_colored = colorize(verdict, _vc) 

513 elif verdict == "error": 

514 _vc = _elc.get("error", "38;5;208") 

515 symbol = colorize("\u2717", _vc) 

516 verdict_colored = colorize(verdict, _vc) 

517 else: 

518 symbol = colorize("\u2717", "38;5;208") 

519 verdict_colored = colorize(verdict, "2") 

520 # Build verdict line 

521 if error and verdict == "error": 

522 verdict_line = f"{symbol} {verdict_colored}: {error}" 

523 elif confidence is not None: 

524 verdict_line = ( 

525 f"{symbol} {verdict_colored} {colorize(f'({confidence:.2f})', '2')}" 

526 ) 

527 else: 

528 verdict_line = f"{symbol} {verdict_colored}" 

529 print(f"{indent} {verdict_line}", flush=True) 

530 # Show raw_preview for error verdicts to aid diagnosis 

531 raw_preview = event.get("raw_preview", "") 

532 if raw_preview and verdict == "error": 

533 if verbose: 

534 sub_lines = raw_preview.splitlines() or [""] 

535 first, rest = sub_lines[0], sub_lines[1:] 

536 print(f"{indent} raw: {first}", flush=True) 

537 for sub in rest: 

538 print(f"{indent} {sub}", flush=True) 

539 else: 

540 print(f"{indent} raw: {raw_preview[:200]}", flush=True) 

541 # Show reason on a second line if present (and not already shown as error) 

542 if reason and not (error and verdict == "error"): 

543 if verbose: 

544 for sub in reason.splitlines() or [""]: 

545 print(f"{indent} {sub}", flush=True) 

546 else: 

547 reason_display = reason[:300] + "..." if len(reason) > 300 else reason 

548 print(f"{indent} {reason_display}", flush=True) 

549 

550 elif event_type == "route": 

551 if not quiet: 

552 to_state = event.get("to", "") 

553 print(f"{indent} {colorize('->', '2')} {colorize(to_state, '1')}", flush=True) 

554 

555 # Wire progress display via the EventBus on PersistentExecutor 

556 if not quiet or show_diagrams: 

557 if hasattr(executor, "event_bus"): 

558 executor.event_bus.register(display_progress) 

559 else: 

560 executor._on_event = display_progress 

561 

562 # Enter alternate screen buffer when showing diagrams with clear to prevent 

563 # scrollback contamination from diagrams taller than the terminal height. 

564 global _using_alt_screen 

565 if show_diagrams and clear_screen and sys.stdout.isatty(): 

566 _using_alt_screen = True 

567 print("\033[?1049h\033[H", end="", flush=True) 

568 

569 try: 

570 result = executor.run() 

571 finally: 

572 if _using_alt_screen: 

573 print("\033[?1049l", end="", flush=True) 

574 _using_alt_screen = False 

575 

576 if not quiet: 

577 print() 

578 duration_sec = result.duration_ms / 1000 

579 if duration_sec < 60: 

580 duration_str = f"{duration_sec:.1f}s" 

581 else: 

582 minutes = int(duration_sec // 60) 

583 seconds = duration_sec % 60 

584 duration_str = f"{minutes}m {seconds:.0f}s" 

585 if result.terminated_by == "terminal": 

586 state_colored = colorize(result.final_state, "32") 

587 else: 

588 state_colored = colorize(result.final_state, "38;5;208") 

589 print(f"Loop completed: {state_colored} ({result.iterations} iterations, {duration_str})") 

590 

591 return EXIT_CODES.get(result.terminated_by, 1)