Coverage for little_loops / cli / loop / _helpers.py: 96%
366 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Shared helpers for ll-loop CLI subcommands."""
3from __future__ import annotations
5import argparse
6import json
7import signal
8import subprocess
9import sys
10import time
11from datetime import datetime
12from pathlib import Path
13from types import FrameType
14from typing import TYPE_CHECKING, Any
16from little_loops.cli.output import colorize, terminal_width
17from little_loops.fsm.concurrency import _process_alive
18from little_loops.logger import Logger
20if TYPE_CHECKING:
21 from little_loops.fsm.schema import FSMLoop
23# Exit code mapping for terminated_by values
24EXIT_CODES: dict[str, int] = {
25 "terminal": 0,
26 "signal": 0,
27 "handoff": 0,
28 "max_iterations": 1,
29 "timeout": 1,
30 "cycle_detected": 1,
31}
33# Module-level shutdown state for signal handling
34_loop_shutdown_requested: bool = False
35_loop_executor: Any = None
36_loop_pid_file: Path | None = None
37_using_alt_screen: bool = False
40def _loop_signal_handler(signum: int, frame: FrameType | None) -> None:
41 """Handle shutdown signals gracefully for ll-loop.
43 First signal: Set shutdown flag for graceful exit after current state.
44 Second signal: Force immediate exit.
45 """
46 global _loop_shutdown_requested, _using_alt_screen
47 if _loop_shutdown_requested:
48 # Second signal - force exit
49 if _loop_pid_file is not None:
50 _loop_pid_file.unlink(missing_ok=True)
51 if _using_alt_screen:
52 print("\033[?1049l", end="", file=sys.stderr, flush=True)
53 print(colorize("\nForce shutdown requested", "38;5;208"), file=sys.stderr)
54 sys.exit(1)
55 _loop_shutdown_requested = True
56 print(colorize("\nShutdown requested, will exit after current state...", "33"), file=sys.stderr)
57 if _loop_executor is not None:
58 _loop_executor.request_shutdown()
59 # Kill any child subprocess currently blocking in the action runner
60 inner = getattr(_loop_executor, "_executor", None)
61 if inner is not None:
62 runner = getattr(inner, "action_runner", None)
63 if runner is not None:
64 proc = getattr(runner, "_current_process", None)
65 if proc is not None:
66 proc.kill()
67 # Also kill MCP subprocesses tracked directly on FSMExecutor (_run_subprocess path)
68 fsm_proc = getattr(inner, "_current_process", None)
69 if fsm_proc is not None:
70 fsm_proc.kill()
73def _is_earliest_waiter(entry_id: str, queue_dir: Path) -> bool:
74 """Return True if entry_id is the earliest-enqueued waiter in queue_dir.
76 Returns True when this waiter is first or the queue is empty/unreadable,
77 allowing it to proceed with acquire(). Non-first waiters return False and
78 should back off to yield to the earlier waiter (ENH-1332).
80 Stale entries (dead PIDs) are removed on the fly so orphaned queue files
81 from crashed processes do not block live waiters indefinitely (BUG-1360).
82 """
83 if not queue_dir.exists():
84 return True
85 entries: list[dict] = []
86 for f in queue_dir.glob("*.json"):
87 try:
88 with open(f) as fh:
89 data = json.load(fh)
90 pid = data.get("context", {}).get("pid")
91 if pid is not None and not _process_alive(pid):
92 f.unlink(missing_ok=True)
93 continue
94 entries.append(data)
95 except (json.JSONDecodeError, KeyError, FileNotFoundError, OSError):
96 continue
97 if not entries:
98 return True
99 entries.sort(key=lambda d: d.get("enqueuedAt", ""))
100 return entries[0].get("id") == entry_id
103def register_loop_signal_handlers(executor: Any, pid_file: Path | None = None) -> None:
104 """Register SIGINT/SIGTERM handlers for graceful loop shutdown.
106 Sets up signal handling so that Ctrl-C triggers a graceful shutdown
107 (calls executor.request_shutdown()) rather than raising KeyboardInterrupt.
108 A second Ctrl-C forces immediate exit with PID file cleanup.
110 Args:
111 executor: The PersistentExecutor instance to request shutdown on.
112 pid_file: Optional path to PID file to clean up on forced exit.
113 """
114 global _loop_shutdown_requested, _loop_executor, _loop_pid_file
115 _loop_shutdown_requested = False
116 _loop_executor = executor
117 _loop_pid_file = pid_file
118 signal.signal(signal.SIGINT, _loop_signal_handler)
119 signal.signal(signal.SIGTERM, _loop_signal_handler)
122def get_builtin_loops_dir() -> Path:
123 """Get the path to built-in loops bundled with the plugin."""
124 return Path(__file__).parent.parent.parent / "loops"
127def resolve_loop_path(name_or_path: str, loops_dir: Path) -> Path:
128 """Resolve loop name to file path."""
129 path = Path(name_or_path)
130 if path.exists():
131 return path
133 # Try <loops_dir>/<name>.fsm.yaml first (compiled FSM)
134 fsm_path = loops_dir / f"{name_or_path}.fsm.yaml"
135 if fsm_path.exists():
136 return fsm_path
138 # Fall back to <loops_dir>/<name>.yaml
139 loops_path = loops_dir / f"{name_or_path}.yaml"
140 if loops_path.exists():
141 return loops_path
143 # Fall back to built-in loops from plugin directory
144 builtin_path = get_builtin_loops_dir() / f"{name_or_path}.yaml"
145 if builtin_path.exists():
146 return builtin_path
148 raise FileNotFoundError(f"Loop not found: {name_or_path}")
151def load_loop(name_or_path: str, loops_dir: Path, logger: Logger) -> FSMLoop:
152 """Load and validate a loop.
154 Raises:
155 FileNotFoundError: If loop not found.
156 ValueError: If loop is invalid.
157 """
158 from little_loops.fsm.validation import load_and_validate
160 path = resolve_loop_path(name_or_path, loops_dir)
161 fsm, _ = load_and_validate(path)
162 return fsm
165def load_loop_with_spec(
166 name_or_path: str, loops_dir: Path, logger: Logger
167) -> tuple[FSMLoop, dict[str, Any]]:
168 """Load a loop and return both the FSMLoop and raw spec dict.
170 Used by commands that need access to raw YAML fields (e.g., description).
172 Raises:
173 FileNotFoundError: If loop not found.
174 ValueError: If loop is invalid.
175 """
176 import yaml
178 from little_loops.fsm.validation import load_and_validate
180 path = resolve_loop_path(name_or_path, loops_dir)
182 with open(path) as f:
183 spec = yaml.safe_load(f)
185 fsm, _ = load_and_validate(path)
186 return fsm, spec
189def print_execution_plan(fsm: FSMLoop, edge_label_colors: dict[str, str] | None = None) -> None:
190 """Print dry-run execution plan."""
191 _elc = edge_label_colors or {}
192 _yes_color = _elc.get("yes", "32")
193 tw = terminal_width()
194 print(colorize(f"Execution plan for: {fsm.name}", "1"))
195 print()
196 print("States:")
197 for name, state in fsm.states.items():
198 terminal_marker = colorize(" [TERMINAL]", _yes_color) if state.terminal else ""
199 print(f" {colorize(f'[{name}]', '1')}{terminal_marker}")
200 if state.action:
201 if state.action_type == "prompt":
202 lines = state.action.strip().splitlines()
203 preview = "\n ".join(lines[:3])
204 if len(lines) > 3 or len(state.action) > 200:
205 preview += " ..."
206 print(f" action: |\n {preview}")
207 else:
208 max_action = tw - 16
209 action_display = (
210 state.action[:max_action] + "..."
211 if len(state.action) > max_action
212 else state.action
213 )
214 print(f" action: {action_display}")
215 if state.evaluate:
216 print(f" evaluate: {state.evaluate.type}")
217 if state.on_yes:
218 print(f" on_yes {colorize('->', '2')} {colorize(state.on_yes, '2')}")
219 if state.on_no:
220 print(f" on_no {colorize('->', '2')} {colorize(state.on_no, '2')}")
221 if state.on_error:
222 print(f" on_error {colorize('->', '2')} {colorize(state.on_error, '2')}")
223 if state.next:
224 print(f" next {colorize('->', '2')} {colorize(state.next, '2')}")
225 if state.route:
226 print(" route:")
227 for verdict, target in state.route.routes.items():
228 print(f" {verdict} {colorize('->', '2')} {colorize(target, '2')}")
229 if state.route.default:
230 print(f" _ {colorize('->', '2')} {colorize(state.route.default, '2')}")
231 print()
232 print(f"Initial state: {fsm.initial}")
233 print(f"Max iterations: {fsm.max_iterations}")
234 if fsm.timeout:
235 print(f"Timeout: {fsm.timeout}s")
236 if fsm.context:
237 print("Context:")
238 for key, value in fsm.context.items():
239 print(f" {key}: {value!r}")
242def _make_instance_id(loop_name: str) -> str:
243 """Generate a unique instance ID for a loop run."""
244 return f"{loop_name}-{datetime.now().strftime('%Y%m%dT%H%M%S')}"
247def run_background(
248 loop_name: str, args: argparse.Namespace, loops_dir: Path, subcommand: str = "run"
249) -> int:
250 """Launch loop as a detached background process.
252 Spawns a new process with start_new_session=True that re-executes
253 the loop with --foreground-internal. The parent writes the PID file
254 and returns immediately.
256 Args:
257 subcommand: The ll-loop subcommand to spawn ("run" or "resume").
259 Returns:
260 Exit code (0 = launched successfully).
261 """
262 running_dir = loops_dir / ".running"
263 running_dir.mkdir(parents=True, exist_ok=True)
265 instance_id = _make_instance_id(loop_name)
266 pid_file = running_dir / f"{instance_id}.pid"
267 log_file = running_dir / f"{instance_id}.log"
269 # Build re-exec command with --foreground-internal instead of --background
270 cmd = [
271 sys.executable,
272 "-m",
273 "little_loops.cli.loop",
274 subcommand,
275 loop_name,
276 ]
277 input_val = getattr(args, "input", None)
278 if input_val is not None:
279 cmd.append(input_val)
280 cmd.append("--foreground-internal")
281 cmd.extend(["--instance-id", instance_id])
283 # Forward relevant args
284 max_iter = getattr(args, "max_iterations", None)
285 if max_iter:
286 cmd.extend(["--max-iterations", str(max_iter)])
287 if getattr(args, "no_llm", False):
288 cmd.append("--no-llm")
289 llm_model = getattr(args, "llm_model", None)
290 if llm_model:
291 cmd.extend(["--llm-model", llm_model])
292 if getattr(args, "verbose", False):
293 cmd.append("--verbose")
294 if getattr(args, "show_diagrams", False):
295 cmd.append("--show-diagrams")
296 if getattr(args, "quiet", False):
297 cmd.append("--quiet")
298 if getattr(args, "queue", False):
299 cmd.append("--queue")
300 for kv in getattr(args, "context", None) or []:
301 cmd.extend(["--context", kv])
302 program_md = getattr(args, "program_md", None)
303 if program_md is not None:
304 cmd.extend(["--program-md", str(program_md)])
305 delay = getattr(args, "delay", None)
306 if delay is not None:
307 cmd.extend(["--delay", str(delay)])
308 handoff_threshold = getattr(args, "handoff_threshold", None)
309 if handoff_threshold is not None:
310 cmd.extend(["--handoff-threshold", str(handoff_threshold)])
311 context_limit = getattr(args, "context_limit", None)
312 if context_limit is not None:
313 cmd.extend(["--context-limit", str(context_limit)])
315 with open(log_file, "w") as log_fh:
316 process = subprocess.Popen(
317 cmd,
318 start_new_session=True,
319 stdout=log_fh,
320 stderr=log_fh,
321 stdin=subprocess.DEVNULL,
322 )
324 pid_file.write_text(str(process.pid))
325 print(
326 f"Loop {colorize(loop_name, '1')} started in background (PID: {colorize(str(process.pid), '2')})"
327 )
328 print(f" Log: {colorize(str(log_file), '2')}")
329 print(f" Status: {colorize(f'll-loop status {loop_name}', '2')}")
330 print(f" Stop: {colorize(f'll-loop stop {loop_name}', '2')}")
331 return 0
334def run_foreground(
335 executor: Any,
336 fsm: FSMLoop,
337 args: argparse.Namespace,
338 highlight_color: str = "32",
339 edge_label_colors: dict[str, str] | None = None,
340 badges: dict[str, str] | None = None,
341) -> int:
342 """Run loop with progress display.
344 Args:
345 highlight_color: ANSI SGR code for the active FSM state highlight in verbose mode.
346 edge_label_colors: Optional label→SGR-code mapping for transition edge labels.
347 badges: Optional glyph-key→string mapping for state type badges in FSM diagrams.
349 Returns:
350 Exit code (0 = success).
351 """
352 quiet = getattr(args, "quiet", False)
353 verbose = getattr(args, "verbose", False)
354 show_diagrams = getattr(args, "show_diagrams", False)
355 clear_screen = getattr(args, "clear", False)
356 if not quiet:
357 print(f"Running loop: {colorize(fsm.name, '1')}")
358 print(f"Max iterations: {colorize(str(fsm.max_iterations), '2')}")
359 print()
361 current_iteration = [0] # Use list to allow mutation in closure
362 last_state_at_depth: dict[int, str] = {} # Track last known state per nesting depth
363 child_fsm_stack: dict[int, FSMLoop | None] = {} # Active child FSM per depth
364 loop_start_time = time.monotonic()
366 def display_progress(event: dict) -> None:
367 """Display progress for events."""
368 event_type = event.get("event")
369 depth = event.get("depth", 0)
370 indent = " " * depth
371 tw = terminal_width()
372 max_line = tw - 8 - len(indent)
374 if event_type == "state_enter":
375 current_iteration[0] = event.get("iteration", 0)
376 state = event.get("state", "")
377 if not quiet:
378 elapsed_int = int(time.monotonic() - loop_start_time)
379 if elapsed_int < 60:
380 elapsed_str = f"{elapsed_int}s"
381 else:
382 elapsed_str = f"{elapsed_int // 60}m {elapsed_int % 60}s"
383 if clear_screen and sys.stdout.isatty() and depth == 0:
384 print("\033[2J\033[H", end="", flush=True)
385 # Update last-known state at this depth and clear stale deeper entries
386 last_state_at_depth[depth] = state
387 for k in [k for k in last_state_at_depth if k > depth]:
388 del last_state_at_depth[k]
389 # Load child FSM for the current state at this depth
390 parent_at_depth = fsm if depth == 0 else child_fsm_stack.get(depth - 1)
391 if parent_at_depth is not None and state in parent_at_depth.states:
392 fsm_state = parent_at_depth.states[state]
393 if fsm_state.loop is not None:
394 try:
395 child_fsm_stack[depth] = load_loop(
396 fsm_state.loop, executor.loops_dir, Logger()
397 )
398 except (FileNotFoundError, ValueError):
399 pass # leave child_fsm_stack[depth] unchanged on failure
400 else:
401 child_fsm_stack[depth] = None
402 else:
403 child_fsm_stack[depth] = None
404 # Clear stale deeper child FSM entries
405 for k in [k for k in child_fsm_stack if k > depth]:
406 del child_fsm_stack[k]
407 if show_diagrams:
408 from little_loops.cli.loop.layout import _render_fsm_diagram
410 diagram = _render_fsm_diagram(
411 fsm,
412 highlight_state=last_state_at_depth.get(0),
413 highlight_color=highlight_color,
414 edge_label_colors=edge_label_colors,
415 badges=badges,
416 )
417 header_text = f"== loop: {fsm.name} "
418 header = header_text + "=" * max(0, tw - len(header_text))
419 print(header, flush=True)
420 print(diagram, flush=True)
421 for d, child_fsm_at_d in sorted(child_fsm_stack.items()):
422 if child_fsm_at_d is not None and (d + 1) in last_state_at_depth:
423 child_name = child_fsm_at_d.name
424 separator_text = f"\u2500\u2500 sub-loop: {child_name} "
425 separator = separator_text + "\u2500" * max(0, tw - len(separator_text))
426 print(separator, flush=True)
427 child_diagram = _render_fsm_diagram(
428 child_fsm_at_d,
429 highlight_state=last_state_at_depth.get(d + 1),
430 highlight_color=highlight_color,
431 edge_label_colors=edge_label_colors,
432 badges=badges,
433 )
434 print(child_diagram, flush=True)
435 if not quiet:
436 print(
437 f"{indent}[{current_iteration[0]}/{fsm.max_iterations}] {colorize(state, '1')} ({colorize(elapsed_str, '2')})",
438 end="",
439 flush=True,
440 )
442 elif event_type == "action_start":
443 if not quiet:
444 action = event.get("action", "")
445 is_prompt = event.get("is_prompt", False)
446 if is_prompt:
447 lines = action.strip().splitlines()
448 line_count = len(lines)
449 prompt_badge = "\u2726" # ✦
450 print(
451 f"{indent} -> {colorize(prompt_badge, '2')} {colorize(f'({line_count} lines)', '2')}",
452 flush=True,
453 )
454 show_count = line_count if verbose else min(5, line_count)
455 for line in lines[:show_count]:
456 if verbose:
457 print(f"{indent} {line}", flush=True)
458 else:
459 display = line[:max_line] + "..." if len(line) > max_line else line
460 print(f"{indent} {display}", flush=True)
461 if line_count > show_count:
462 print(
463 f"{indent} ... ({line_count - show_count} more lines)", flush=True
464 )
465 else:
466 if verbose:
467 action_display = action
468 else:
469 action_display = (
470 action[:max_line] + "..." if len(action) > max_line else action
471 )
472 print(f"{indent} -> {colorize(action_display, '2')}", flush=True)
474 elif event_type == "action_output":
475 if not quiet:
476 line = event.get("line", "")
477 if line.strip():
478 print(f"{indent} {line}", flush=True)
480 elif event_type == "action_complete":
481 if not quiet:
482 duration_ms = event.get("duration_ms", 0)
483 exit_code = event.get("exit_code", 0)
484 duration_sec = duration_ms / 1000
485 if duration_sec < 60:
486 duration_str = f"{duration_sec:.1f}s"
487 else:
488 minutes = int(duration_sec // 60)
489 seconds = duration_sec % 60
490 duration_str = f"{minutes}m {seconds:.0f}s"
491 parts = [f"{indent} ({colorize(duration_str, '2')})"]
492 if exit_code == 124:
493 parts.append(colorize("timed out", "38;5;208"))
494 elif exit_code != 0:
495 parts.append(colorize(f"exit: {exit_code}", "38;5;208"))
496 print(" ".join(parts), flush=True)
498 elif event_type == "evaluate":
499 if not quiet:
500 verdict = event.get("verdict", "")
501 confidence = event.get("confidence")
502 reason = event.get("reason", "")
503 error = event.get("error", "")
504 _elc = edge_label_colors or {}
505 if verdict in ("yes", "target", "progress"):
506 _vc = _elc.get("yes", "32")
507 symbol = colorize("\u2713", _vc)
508 verdict_colored = colorize(verdict, _vc)
509 elif verdict == "no":
510 _vc = _elc.get("no", "38;5;208")
511 symbol = colorize("\u2717", _vc)
512 verdict_colored = colorize(verdict, _vc)
513 elif verdict == "error":
514 _vc = _elc.get("error", "38;5;208")
515 symbol = colorize("\u2717", _vc)
516 verdict_colored = colorize(verdict, _vc)
517 else:
518 symbol = colorize("\u2717", "38;5;208")
519 verdict_colored = colorize(verdict, "2")
520 # Build verdict line
521 if error and verdict == "error":
522 verdict_line = f"{symbol} {verdict_colored}: {error}"
523 elif confidence is not None:
524 verdict_line = (
525 f"{symbol} {verdict_colored} {colorize(f'({confidence:.2f})', '2')}"
526 )
527 else:
528 verdict_line = f"{symbol} {verdict_colored}"
529 print(f"{indent} {verdict_line}", flush=True)
530 # Show raw_preview for error verdicts to aid diagnosis
531 raw_preview = event.get("raw_preview", "")
532 if raw_preview and verdict == "error":
533 if verbose:
534 sub_lines = raw_preview.splitlines() or [""]
535 first, rest = sub_lines[0], sub_lines[1:]
536 print(f"{indent} raw: {first}", flush=True)
537 for sub in rest:
538 print(f"{indent} {sub}", flush=True)
539 else:
540 print(f"{indent} raw: {raw_preview[:200]}", flush=True)
541 # Show reason on a second line if present (and not already shown as error)
542 if reason and not (error and verdict == "error"):
543 if verbose:
544 for sub in reason.splitlines() or [""]:
545 print(f"{indent} {sub}", flush=True)
546 else:
547 reason_display = reason[:300] + "..." if len(reason) > 300 else reason
548 print(f"{indent} {reason_display}", flush=True)
550 elif event_type == "route":
551 if not quiet:
552 to_state = event.get("to", "")
553 print(f"{indent} {colorize('->', '2')} {colorize(to_state, '1')}", flush=True)
555 # Wire progress display via the EventBus on PersistentExecutor
556 if not quiet or show_diagrams:
557 if hasattr(executor, "event_bus"):
558 executor.event_bus.register(display_progress)
559 else:
560 executor._on_event = display_progress
562 # Enter alternate screen buffer when showing diagrams with clear to prevent
563 # scrollback contamination from diagrams taller than the terminal height.
564 global _using_alt_screen
565 if show_diagrams and clear_screen and sys.stdout.isatty():
566 _using_alt_screen = True
567 print("\033[?1049h\033[H", end="", flush=True)
569 try:
570 result = executor.run()
571 finally:
572 if _using_alt_screen:
573 print("\033[?1049l", end="", flush=True)
574 _using_alt_screen = False
576 if not quiet:
577 print()
578 duration_sec = result.duration_ms / 1000
579 if duration_sec < 60:
580 duration_str = f"{duration_sec:.1f}s"
581 else:
582 minutes = int(duration_sec // 60)
583 seconds = duration_sec % 60
584 duration_str = f"{minutes}m {seconds:.0f}s"
585 if result.terminated_by == "terminal":
586 state_colored = colorize(result.final_state, "32")
587 else:
588 state_colored = colorize(result.final_state, "38;5;208")
589 print(f"Loop completed: {state_colored} ({result.iterations} iterations, {duration_str})")
591 return EXIT_CODES.get(result.terminated_by, 1)