Coverage for little_loops / fsm / runners.py: 90%

109 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""FSM action runner implementations. 

2 

3Provides the protocol and concrete implementations for action execution: 

4- ActionRunner: Protocol defining the runner interface 

5- DefaultActionRunner: Subprocess-based runner for real execution 

6- SimulationActionRunner: Interactive/scenario-based runner for testing and dry-runs 

7""" 

8 

9from __future__ import annotations 

10 

11import subprocess 

12import sys 

13import threading 

14import time 

15from collections.abc import Callable 

16from dataclasses import dataclass, field 

17from typing import Protocol 

18 

19from little_loops.fsm.types import ActionResult 

20from little_loops.subprocess_utils import run_claude_command 

21 

22 

23def _now_ms() -> int: 

24 """Get current time in milliseconds.""" 

25 return int(time.time() * 1000) 

26 

27 

28class ActionRunner(Protocol): 

29 """Protocol for action execution.""" 

30 

31 def run( 

32 self, 

33 action: str, 

34 timeout: int, 

35 is_slash_command: bool, 

36 on_output_line: Callable[[str], None] | None = None, 

37 agent: str | None = None, 

38 tools: list[str] | None = None, 

39 ) -> ActionResult: 

40 """Execute an action and return the result. 

41 

42 Args: 

43 action: The command to execute 

44 timeout: Timeout in seconds 

45 is_slash_command: True if this is a slash command (starts with /) 

46 on_output_line: Optional callback invoked for each output line 

47 agent: Optional agent name to pass as --agent to Claude CLI (prompt-mode only) 

48 tools: Optional list of tool names to pass as --tools CSV to Claude CLI (prompt-mode only) 

49 

50 Returns: 

51 ActionResult with output, stderr, exit_code, duration_ms 

52 """ 

53 ... 

54 

55 

56class DefaultActionRunner: 

57 """Execute actions via subprocess or Claude CLI.""" 

58 

59 def __init__(self) -> None: 

60 self._current_process: subprocess.Popen[str] | None = None 

61 

62 def run( 

63 self, 

64 action: str, 

65 timeout: int, 

66 is_slash_command: bool, 

67 on_output_line: Callable[[str], None] | None = None, 

68 agent: str | None = None, 

69 tools: list[str] | None = None, 

70 ) -> ActionResult: 

71 """Execute action and return result, streaming output line by line. 

72 

73 Args: 

74 action: The command to execute 

75 timeout: Timeout in seconds 

76 is_slash_command: True if action starts with / 

77 on_output_line: Optional callback invoked for each stdout line 

78 agent: Optional agent name to pass as --agent to Claude CLI (prompt-mode only) 

79 tools: Optional list of tool names to pass as --tools CSV (prompt-mode only) 

80 

81 Returns: 

82 ActionResult with execution details 

83 """ 

84 start = _now_ms() 

85 

86 if is_slash_command: 

87 # Execute via Claude CLI using run_claude_command() so that the 

88 # subprocess loads the full plugin/tool context (including deferred 

89 # tools like Skill). The old --no-session-persistence path prevented 

90 # ToolSearch from resolving deferred tool schemas (BUG-946). 

91 def _stream_cb(line: str, is_stderr: bool) -> None: 

92 if not is_stderr and on_output_line: 

93 on_output_line(line) 

94 

95 def _on_proc_start(p: subprocess.Popen[str]) -> None: 

96 self._current_process = p 

97 

98 def _on_proc_end(p: subprocess.Popen[str]) -> None: 

99 self._current_process = None 

100 

101 try: 

102 completed = run_claude_command( 

103 command=action, 

104 timeout=timeout, 

105 stream_callback=_stream_cb, 

106 on_process_start=_on_proc_start, 

107 on_process_end=_on_proc_end, 

108 agent=agent, 

109 tools=tools, 

110 ) 

111 except subprocess.TimeoutExpired: 

112 return ActionResult( 

113 output="", 

114 stderr="Action timed out", 

115 exit_code=124, 

116 duration_ms=timeout * 1000, 

117 ) 

118 return ActionResult( 

119 output=completed.stdout, 

120 stderr=completed.stderr, 

121 exit_code=completed.returncode, 

122 duration_ms=_now_ms() - start, 

123 ) 

124 

125 # Shell command 

126 cmd = ["bash", "-c", action] 

127 process = subprocess.Popen( 

128 cmd, 

129 stdout=subprocess.PIPE, 

130 stderr=subprocess.PIPE, 

131 text=True, 

132 ) 

133 self._current_process = process 

134 output_chunks: list[str] = [] 

135 stderr_chunks: list[str] = [] 

136 

137 def _drain_stderr() -> None: 

138 assert process.stderr is not None 

139 for line in process.stderr: 

140 stderr_chunks.append(line) 

141 

142 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) 

143 stderr_thread.start() 

144 

145 try: 

146 for line in process.stdout: # type: ignore[union-attr] 

147 output_chunks.append(line) 

148 if on_output_line: 

149 on_output_line(line.rstrip()) 

150 process.wait(timeout=timeout) 

151 except subprocess.TimeoutExpired: 

152 process.kill() 

153 process.wait() 

154 stderr_thread.join(timeout=5) 

155 return ActionResult( 

156 output="".join(output_chunks), 

157 stderr="".join(stderr_chunks) or "Action timed out", 

158 exit_code=124, 

159 duration_ms=timeout * 1000, 

160 ) 

161 finally: 

162 self._current_process = None 

163 stderr_thread.join(timeout=5) 

164 stderr = "".join(stderr_chunks) 

165 return ActionResult( 

166 output="".join(output_chunks), 

167 stderr=stderr, 

168 exit_code=process.returncode, 

169 duration_ms=_now_ms() - start, 

170 ) 

171 

172 

173@dataclass 

174class SimulationActionRunner: 

175 """Action runner for simulation mode - prompts user instead of executing. 

176 

177 This runner allows users to trace through FSM logic without executing 

178 real commands. It can either prompt interactively for results or use 

179 predefined scenarios. 

180 

181 Attributes: 

182 scenario: Predefined result pattern ("all-pass", "all-fail", "first-fail", "alternating") 

183 call_count: Number of actions simulated so far 

184 calls: List of all actions that would have been executed 

185 """ 

186 

187 scenario: str | None = None 

188 call_count: int = 0 

189 calls: list[str] = field(default_factory=list) 

190 

191 def run( 

192 self, 

193 action: str, 

194 timeout: int, 

195 is_slash_command: bool, 

196 on_output_line: Callable[[str], None] | None = None, 

197 agent: str | None = None, 

198 tools: list[str] | None = None, 

199 ) -> ActionResult: 

200 """Prompt user for simulated result instead of executing. 

201 

202 Args: 

203 action: The command that would be executed 

204 timeout: Timeout (ignored in simulation) 

205 is_slash_command: Whether this is a slash command 

206 on_output_line: Ignored in simulation 

207 agent: Ignored in simulation 

208 tools: Ignored in simulation 

209 

210 Returns: 

211 ActionResult with simulated exit code 

212 """ 

213 del timeout, on_output_line, agent, tools # unused in simulation 

214 self.calls.append(action) 

215 self.call_count += 1 

216 

217 cmd_type = "slash command" if is_slash_command else "shell command" 

218 print(f" [SIMULATED] Would execute ({cmd_type}): {action}") 

219 

220 if self.scenario: 

221 exit_code = self._scenario_result() 

222 scenario_label = { 

223 "all-pass": "Success (scenario: all-pass)", 

224 "all-fail": "Failure (scenario: all-fail)", 

225 "all-error": "Error (scenario: all-error)", 

226 "first-fail": "Failure" if self.call_count == 1 else "Success", 

227 "alternating": "Failure" if self.call_count % 2 == 1 else "Success", 

228 }.get(self.scenario, "Success") 

229 print(f" [AUTO] Result: {scenario_label}") 

230 else: 

231 exit_code = self._prompt_result() 

232 

233 return ActionResult( 

234 output=f"[simulated output for: {action}]", 

235 stderr="", 

236 exit_code=exit_code, 

237 duration_ms=0, 

238 ) 

239 

240 def _scenario_result(self) -> int: 

241 """Return exit code based on scenario pattern. 

242 

243 Returns: 

244 0 for success, 1 for failure, 2 for error based on scenario logic 

245 """ 

246 if self.scenario == "all-pass": 

247 return 0 

248 elif self.scenario == "all-fail": 

249 return 1 

250 elif self.scenario == "all-error": 

251 return 2 

252 elif self.scenario == "first-fail": 

253 # First call fails, rest pass 

254 return 1 if self.call_count == 1 else 0 

255 elif self.scenario == "alternating": 

256 # Odd calls fail, even calls pass 

257 return 1 if self.call_count % 2 == 1 else 0 

258 return 0 

259 

260 def _prompt_result(self) -> int: 

261 """Prompt user for simulated exit code. 

262 

263 Returns: 

264 Exit code based on user selection 

265 """ 

266 print() 

267 print(" ? What should the simulated result be?") 

268 print(" 1) Success (exit 0) [default]") 

269 print(" 2) Failure (exit 1)") 

270 print(" 3) Error (exit 2)") 

271 

272 while True: 

273 try: 

274 sys.stdout.write(" > ") 

275 sys.stdout.flush() 

276 choice = sys.stdin.readline().strip() 

277 if choice in ("1", ""): 

278 return 0 

279 elif choice == "2": 

280 return 1 

281 elif choice == "3": 

282 return 2 

283 print(" Invalid choice. Enter 1, 2, or 3.") 

284 except (EOFError, KeyboardInterrupt): 

285 print() 

286 return 0