Coverage for little_loops / fsm / runners.py: 90%
109 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""FSM action runner implementations.
3Provides the protocol and concrete implementations for action execution:
4- ActionRunner: Protocol defining the runner interface
5- DefaultActionRunner: Subprocess-based runner for real execution
6- SimulationActionRunner: Interactive/scenario-based runner for testing and dry-runs
7"""
9from __future__ import annotations
11import subprocess
12import sys
13import threading
14import time
15from collections.abc import Callable
16from dataclasses import dataclass, field
17from typing import Protocol
19from little_loops.fsm.types import ActionResult
20from little_loops.subprocess_utils import run_claude_command
23def _now_ms() -> int:
24 """Get current time in milliseconds."""
25 return int(time.time() * 1000)
28class ActionRunner(Protocol):
29 """Protocol for action execution."""
31 def run(
32 self,
33 action: str,
34 timeout: int,
35 is_slash_command: bool,
36 on_output_line: Callable[[str], None] | None = None,
37 agent: str | None = None,
38 tools: list[str] | None = None,
39 ) -> ActionResult:
40 """Execute an action and return the result.
42 Args:
43 action: The command to execute
44 timeout: Timeout in seconds
45 is_slash_command: True if this is a slash command (starts with /)
46 on_output_line: Optional callback invoked for each output line
47 agent: Optional agent name to pass as --agent to Claude CLI (prompt-mode only)
48 tools: Optional list of tool names to pass as --tools CSV to Claude CLI (prompt-mode only)
50 Returns:
51 ActionResult with output, stderr, exit_code, duration_ms
52 """
53 ...
56class DefaultActionRunner:
57 """Execute actions via subprocess or Claude CLI."""
59 def __init__(self) -> None:
60 self._current_process: subprocess.Popen[str] | None = None
62 def run(
63 self,
64 action: str,
65 timeout: int,
66 is_slash_command: bool,
67 on_output_line: Callable[[str], None] | None = None,
68 agent: str | None = None,
69 tools: list[str] | None = None,
70 ) -> ActionResult:
71 """Execute action and return result, streaming output line by line.
73 Args:
74 action: The command to execute
75 timeout: Timeout in seconds
76 is_slash_command: True if action starts with /
77 on_output_line: Optional callback invoked for each stdout line
78 agent: Optional agent name to pass as --agent to Claude CLI (prompt-mode only)
79 tools: Optional list of tool names to pass as --tools CSV (prompt-mode only)
81 Returns:
82 ActionResult with execution details
83 """
84 start = _now_ms()
86 if is_slash_command:
87 # Execute via Claude CLI using run_claude_command() so that the
88 # subprocess loads the full plugin/tool context (including deferred
89 # tools like Skill). The old --no-session-persistence path prevented
90 # ToolSearch from resolving deferred tool schemas (BUG-946).
91 def _stream_cb(line: str, is_stderr: bool) -> None:
92 if not is_stderr and on_output_line:
93 on_output_line(line)
95 def _on_proc_start(p: subprocess.Popen[str]) -> None:
96 self._current_process = p
98 def _on_proc_end(p: subprocess.Popen[str]) -> None:
99 self._current_process = None
101 try:
102 completed = run_claude_command(
103 command=action,
104 timeout=timeout,
105 stream_callback=_stream_cb,
106 on_process_start=_on_proc_start,
107 on_process_end=_on_proc_end,
108 agent=agent,
109 tools=tools,
110 )
111 except subprocess.TimeoutExpired:
112 return ActionResult(
113 output="",
114 stderr="Action timed out",
115 exit_code=124,
116 duration_ms=timeout * 1000,
117 )
118 return ActionResult(
119 output=completed.stdout,
120 stderr=completed.stderr,
121 exit_code=completed.returncode,
122 duration_ms=_now_ms() - start,
123 )
125 # Shell command
126 cmd = ["bash", "-c", action]
127 process = subprocess.Popen(
128 cmd,
129 stdout=subprocess.PIPE,
130 stderr=subprocess.PIPE,
131 text=True,
132 )
133 self._current_process = process
134 output_chunks: list[str] = []
135 stderr_chunks: list[str] = []
137 def _drain_stderr() -> None:
138 assert process.stderr is not None
139 for line in process.stderr:
140 stderr_chunks.append(line)
142 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
143 stderr_thread.start()
145 try:
146 for line in process.stdout: # type: ignore[union-attr]
147 output_chunks.append(line)
148 if on_output_line:
149 on_output_line(line.rstrip())
150 process.wait(timeout=timeout)
151 except subprocess.TimeoutExpired:
152 process.kill()
153 process.wait()
154 stderr_thread.join(timeout=5)
155 return ActionResult(
156 output="".join(output_chunks),
157 stderr="".join(stderr_chunks) or "Action timed out",
158 exit_code=124,
159 duration_ms=timeout * 1000,
160 )
161 finally:
162 self._current_process = None
163 stderr_thread.join(timeout=5)
164 stderr = "".join(stderr_chunks)
165 return ActionResult(
166 output="".join(output_chunks),
167 stderr=stderr,
168 exit_code=process.returncode,
169 duration_ms=_now_ms() - start,
170 )
173@dataclass
174class SimulationActionRunner:
175 """Action runner for simulation mode - prompts user instead of executing.
177 This runner allows users to trace through FSM logic without executing
178 real commands. It can either prompt interactively for results or use
179 predefined scenarios.
181 Attributes:
182 scenario: Predefined result pattern ("all-pass", "all-fail", "first-fail", "alternating")
183 call_count: Number of actions simulated so far
184 calls: List of all actions that would have been executed
185 """
187 scenario: str | None = None
188 call_count: int = 0
189 calls: list[str] = field(default_factory=list)
191 def run(
192 self,
193 action: str,
194 timeout: int,
195 is_slash_command: bool,
196 on_output_line: Callable[[str], None] | None = None,
197 agent: str | None = None,
198 tools: list[str] | None = None,
199 ) -> ActionResult:
200 """Prompt user for simulated result instead of executing.
202 Args:
203 action: The command that would be executed
204 timeout: Timeout (ignored in simulation)
205 is_slash_command: Whether this is a slash command
206 on_output_line: Ignored in simulation
207 agent: Ignored in simulation
208 tools: Ignored in simulation
210 Returns:
211 ActionResult with simulated exit code
212 """
213 del timeout, on_output_line, agent, tools # unused in simulation
214 self.calls.append(action)
215 self.call_count += 1
217 cmd_type = "slash command" if is_slash_command else "shell command"
218 print(f" [SIMULATED] Would execute ({cmd_type}): {action}")
220 if self.scenario:
221 exit_code = self._scenario_result()
222 scenario_label = {
223 "all-pass": "Success (scenario: all-pass)",
224 "all-fail": "Failure (scenario: all-fail)",
225 "all-error": "Error (scenario: all-error)",
226 "first-fail": "Failure" if self.call_count == 1 else "Success",
227 "alternating": "Failure" if self.call_count % 2 == 1 else "Success",
228 }.get(self.scenario, "Success")
229 print(f" [AUTO] Result: {scenario_label}")
230 else:
231 exit_code = self._prompt_result()
233 return ActionResult(
234 output=f"[simulated output for: {action}]",
235 stderr="",
236 exit_code=exit_code,
237 duration_ms=0,
238 )
240 def _scenario_result(self) -> int:
241 """Return exit code based on scenario pattern.
243 Returns:
244 0 for success, 1 for failure, 2 for error based on scenario logic
245 """
246 if self.scenario == "all-pass":
247 return 0
248 elif self.scenario == "all-fail":
249 return 1
250 elif self.scenario == "all-error":
251 return 2
252 elif self.scenario == "first-fail":
253 # First call fails, rest pass
254 return 1 if self.call_count == 1 else 0
255 elif self.scenario == "alternating":
256 # Odd calls fail, even calls pass
257 return 1 if self.call_count % 2 == 1 else 0
258 return 0
260 def _prompt_result(self) -> int:
261 """Prompt user for simulated exit code.
263 Returns:
264 Exit code based on user selection
265 """
266 print()
267 print(" ? What should the simulated result be?")
268 print(" 1) Success (exit 0) [default]")
269 print(" 2) Failure (exit 1)")
270 print(" 3) Error (exit 2)")
272 while True:
273 try:
274 sys.stdout.write(" > ")
275 sys.stdout.flush()
276 choice = sys.stdin.readline().strip()
277 if choice in ("1", ""):
278 return 0
279 elif choice == "2":
280 return 1
281 elif choice == "3":
282 return 2
283 print(" Invalid choice. Enter 1, 2, or 3.")
284 except (EOFError, KeyboardInterrupt):
285 print()
286 return 0