Coverage for little_loops / fsm / executor.py: 97%

576 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""FSM Executor - Runtime engine for FSM loop execution. 

2 

3This module provides the execution engine that runs FSM loops: 

4- Executes actions (shell commands or slash commands) 

5- Evaluates results using appropriate evaluators 

6- Routes to next states based on verdicts 

7- Tracks iteration count and enforces limits 

8- Manages captured variables and context 

9""" 

10 

11from __future__ import annotations 

12 

13import json 

14import random 

15import subprocess 

16import threading 

17import time 

18from collections.abc import Callable 

19from dataclasses import dataclass 

20from datetime import UTC, datetime 

21from pathlib import Path 

22from typing import Any 

23 

24from little_loops.fsm.evaluators import ( 

25 EvaluationResult, 

26 evaluate, 

27 evaluate_exit_code, 

28 evaluate_llm_structured, 

29 evaluate_mcp_result, 

30) 

31from little_loops.fsm.handoff_handler import HandoffHandler 

32from little_loops.fsm.interpolation import ( 

33 InterpolationContext, 

34 InterpolationError, 

35 interpolate, 

36 interpolate_dict, 

37) 

38from little_loops.fsm.rate_limit_circuit import RateLimitCircuit 

39from little_loops.fsm.runners import ( 

40 ActionRunner, 

41 DefaultActionRunner, 

42 SimulationActionRunner, # noqa: F401 — re-exported for backward compatibility 

43 _now_ms, 

44) 

45from little_loops.fsm.schema import FSMLoop, StateConfig 

46from little_loops.fsm.signal_detector import DetectedSignal, SignalDetector 

47from little_loops.fsm.types import ActionResult, Evaluator, EventCallback, ExecutionResult 

48from little_loops.issue_lifecycle import FailureType, classify_failure 

49from little_loops.session_log import get_current_session_jsonl 

50 

51# Maximum number of per-state rate-limit retries before emitting rate_limit_exhausted. 

52_DEFAULT_RATE_LIMIT_RETRIES: int = 3 

53# Base backoff in seconds; actual sleep = base * 2^(attempt-1) + uniform(0, base). 

54_DEFAULT_RATE_LIMIT_BACKOFF_BASE: int = 30 

55# Total wall-clock budget (seconds) across short + long tiers before routing to 

56# on_rate_limit_exhausted. Mirrors RateLimitsConfig.max_wait_seconds default (6h). 

57_DEFAULT_RATE_LIMIT_MAX_WAIT_SECONDS: int = 21600 

58# Long-wait tier ladder (seconds), walked once the short-tier budget is spent. 

59# Mirrors RateLimitsConfig.long_wait_ladder default. 

60_DEFAULT_RATE_LIMIT_LONG_WAIT_LADDER: list[int] = [300, 900, 1800, 3600] 

61# Event name emitted when rate-limit retries are exhausted. 

62RATE_LIMIT_EXHAUSTED_EVENT: str = "rate_limit_exhausted" 

63# Event name emitted when consecutive rate-limit exhaustions reach the storm threshold. 

64RATE_LIMIT_STORM_EVENT: str = "rate_limit_storm" 

65# Event name emitted every ~60s during a long-wait rate-limit sleep so UIs can show live progress. 

66RATE_LIMIT_WAITING_EVENT: str = "rate_limit_waiting" 

67# Interval (seconds) between rate_limit_waiting heartbeat emissions during long-wait sleeps. 

68_RATE_LIMIT_HEARTBEAT_INTERVAL: float = 60.0 

69# Number of consecutive rate_limit_exhausted events that constitute a storm. 

70_RATE_LIMIT_STORM_THRESHOLD: int = 3 

71# Progressive throttle defaults: calls 1..normal_max pass through, at warn_max emit warning, 

72# at hard_max route to on_throttle_hard, beyond hard_max hard-stop. 

73_DEFAULT_THROTTLE_NORMAL_MAX: int = 3 

74_DEFAULT_THROTTLE_WARN_MAX: int = 8 

75_DEFAULT_THROTTLE_HARD_MAX: int = 12 

76# Event names for progressive tool-call throttling within a single state visit. 

77THROTTLE_WARN_EVENT: str = "throttle_warn" 

78THROTTLE_HARD_EVENT: str = "throttle_hard" 

79THROTTLE_STOP_EVENT: str = "throttle_stop" 

80# Action types that consume LLM quota and are gated by the shared circuit breaker. 

81# `_action_mode()` collapses both to "prompt"; the frozenset documents intent. 

82LLM_ACTION_TYPES: frozenset[str] = frozenset({"slash_command", "prompt"}) 

83# Maximum per-state API server error retries before falling through to normal routing. 

84_DEFAULT_API_ERROR_RETRIES: int = 2 

85# Flat backoff in seconds between API server error retries (no exponential ladder). 

86_DEFAULT_API_ERROR_BACKOFF: int = 30 

87 

88 

89def _iso_now() -> str: 

90 """Get current time as ISO 8601 string.""" 

91 return datetime.now(UTC).isoformat() 

92 

93 

94@dataclass 

95class RouteContext: 

96 """Context passed to before_route / after_route interceptors.""" 

97 

98 state_name: str 

99 state: StateConfig 

100 verdict: str 

101 action_result: ActionResult | None 

102 eval_result: EvaluationResult | None 

103 ctx: InterpolationContext 

104 iteration: int 

105 

106 

107@dataclass 

108class RouteDecision: 

109 """Returned by before_route to redirect or veto a routing transition. 

110 

111 Return semantics for before_route: 

112 None (implicit) → passthrough, routing proceeds normally 

113 RouteDecision("state") → redirect, bypass _route() and use "state" directly 

114 RouteDecision(None) → veto, _execute_state() returns None → _finish("error") 

115 """ 

116 

117 next_state: str | None # str → redirect; None → veto 

118 

119 

120class FSMExecutor: 

121 """Execute an FSM loop. 

122 

123 The executor runs an FSM from its initial state until: 

124 - A terminal state is reached 

125 - max_iterations is exceeded 

126 - A timeout occurs 

127 - A shutdown signal is received 

128 - An unrecoverable error occurs 

129 

130 Events are emitted via the callback for observability. 

131 """ 

132 

133 def __init__( 

134 self, 

135 fsm: FSMLoop, 

136 event_callback: EventCallback | None = None, 

137 action_runner: ActionRunner | None = None, 

138 signal_detector: SignalDetector | None = None, 

139 handoff_handler: HandoffHandler | None = None, 

140 loops_dir: Path | None = None, 

141 circuit: RateLimitCircuit | None = None, 

142 ): 

143 """Initialize the executor. 

144 

145 Args: 

146 fsm: The FSM loop to execute 

147 event_callback: Optional callback for events 

148 action_runner: Optional custom action runner (for testing) 

149 signal_detector: Optional signal detector for output parsing 

150 handoff_handler: Optional handler for handoff signals 

151 loops_dir: Base directory for resolving sub-loop references 

152 circuit: Optional shared rate-limit circuit breaker for 429 coordination 

153 """ 

154 self.fsm = fsm 

155 self.event_callback = event_callback or (lambda _: None) 

156 self.action_runner: ActionRunner = action_runner or DefaultActionRunner() 

157 self.signal_detector = signal_detector 

158 self.handoff_handler = handoff_handler 

159 self.loops_dir = loops_dir 

160 self._circuit = circuit 

161 

162 # Runtime state 

163 self.current_state = fsm.initial 

164 self.iteration = 0 

165 self.captured: dict[str, dict[str, Any]] = {} 

166 self.prev_result: dict[str, Any] | None = None 

167 self.started_at = "" 

168 self.start_time_ms = 0 

169 self.elapsed_offset_ms = ( 

170 0 # milliseconds from segments before current run (set by PersistentExecutor on resume) 

171 ) 

172 

173 # Shutdown flag for graceful signal handling 

174 self._shutdown_requested = False 

175 

176 # Currently running MCP subprocess (set by _run_subprocess, cleared in finally). 

177 # Enables external shutdown code to kill the process on SIGTERM. 

178 self._current_process: subprocess.Popen[str] | None = None 

179 

180 # Pending handoff signal (set by _run_action, checked by main loop) 

181 self._pending_handoff: DetectedSignal | None = None 

182 

183 # Pending error payload from FATAL_ERROR signal (set by _run_action, checked by main loop) 

184 self._pending_error: str | None = None 

185 

186 # Per-state retry tracking for max_retries support. 

187 # _retry_counts[state_name] = number of consecutive re-entries into that state. 

188 # Incremented each time we enter the same state as the previous iteration. 

189 # Reset when a different state is entered, or after retry exhaustion. 

190 self._retry_counts: dict[str, int] = {} 

191 # State entered in the previous iteration (None on first iteration or after resume). 

192 self._prev_state: str | None = None 

193 

194 # BUG-1226: true between emitting a `route` event and entering the 

195 # target state. Gates the "flush one pending shell state on timeout" 

196 # behavior so we only flush when there is actually a pending state. 

197 self._just_routed: bool = False 

198 

199 # Per-state rate-limit retry tracking (parallel to _retry_counts). 

200 # _rate_limit_retries[state_name] = dict-of-record: 

201 # { 

202 # "short_retries": int, # attempts in short-burst tier 

203 # "long_retries": int, # attempts in long-wait tier 

204 # "total_wait_seconds": float, 

205 # "first_seen_at": float | None, # epoch timestamp of first 429 

206 # } 

207 # Incremented inside _handle_rate_limit on each detected rate-limit response. 

208 # Reset when the state completes without a rate-limit, or after exhaustion. 

209 self._rate_limit_retries: dict[str, dict[str, Any]] = {} 

210 

211 # Consecutive rate_limit_exhausted emissions across all states. Reset on any 

212 # successful non-rate-limited state transition. When this reaches 

213 # _RATE_LIMIT_STORM_THRESHOLD, a RATE_LIMIT_STORM event is emitted. 

214 self._consecutive_rate_limit_exhaustions: int = 0 

215 

216 # Per-state API server error retry tracking (parallel to _rate_limit_retries). 

217 # _api_error_retries[state_name] = {"retries": int, "total_wait": float} 

218 # Reset when the state completes without a server error, or after exhaustion. 

219 self._api_error_retries: dict[str, dict[str, Any]] = {} 

220 

221 # Per-state tool-call throttle counter. Counts successive action executions within 

222 # a single continuous state visit. Reset on state exit; NOT serialized to LoopState 

223 # (throttle counts measure instantaneous visit-level activity, not cumulative retries). 

224 self._throttle_counts: dict[str, int] = {} 

225 

226 # Per-edge revisit counter for cycle detection. 

227 # _edge_revisit_counts["from_state->to_state"] = number of times that edge has fired. 

228 # When any edge exceeds max_edge_revisits, the loop terminates with cycle_detected. 

229 self._edge_revisit_counts: dict[str, int] = {} 

230 

231 # Nesting depth for sub-loop event forwarding (0 = top-level, 1+ = sub-loop). 

232 # Set by the parent executor when constructing child executors. 

233 self._depth: int = 0 

234 

235 # Extension hook registries — populated by wire_extensions() 

236 self._contributed_actions: dict[str, ActionRunner] = {} 

237 self._contributed_evaluators: dict[str, Evaluator] = {} 

238 self._interceptors: list[Any] = [] 

239 

240 def request_shutdown(self) -> None: 

241 """Request graceful shutdown of the executor. 

242 

243 Sets a flag that will be checked at the start of each iteration, 

244 allowing the loop to exit cleanly after the current state completes. 

245 """ 

246 self._shutdown_requested = True 

247 

248 def run(self) -> ExecutionResult: 

249 """Execute the FSM until terminal state or limits reached. 

250 

251 Returns: 

252 ExecutionResult with final state and execution metadata 

253 """ 

254 self.started_at = _iso_now() 

255 self.start_time_ms = _now_ms() 

256 

257 self._emit("loop_start", {"loop": self.fsm.name}) 

258 

259 try: 

260 while True: 

261 # Check shutdown request (signal handling) 

262 if self._shutdown_requested: 

263 return self._finish("signal") 

264 

265 # Check iteration limit 

266 if self.iteration >= self.fsm.max_iterations: 

267 return self._finish("max_iterations") 

268 

269 # Check timeout 

270 if self.fsm.timeout: 

271 elapsed = _now_ms() - self.start_time_ms + self.elapsed_offset_ms 

272 if elapsed > self.fsm.timeout * 1000: 

273 # BUG-1226: if timeout fires in the race window between 

274 # a `route` event and `state_enter`, flush one pending 

275 # shell-action state before honoring the timeout so its 

276 # side effect (e.g. copying a handshake flag) is not 

277 # silently lost. Bounded to shell actions — slash 

278 # commands and sub-loops would violate the timeout 

279 # budget. Single-step: no cascade. 

280 if self._just_routed: 

281 pending = self.fsm.states.get(self.current_state) 

282 if ( 

283 pending is not None 

284 and not pending.terminal 

285 and pending.loop is None 

286 and pending.action is not None 

287 and self._action_mode(pending) == "shell" 

288 ): 

289 self._flush_pending_shell_state(pending) 

290 return self._finish("timeout") 

291 

292 # Get current state config 

293 state_config = self.fsm.states[self.current_state] 

294 

295 # Update per-state retry tracking based on transition from previous iteration. 

296 # If re-entering the same state consecutively, increment retry count. 

297 # If entering a different state, clear the previous state's retry count. 

298 if self._prev_state is not None: 

299 if self.current_state == self._prev_state: 

300 self._retry_counts[self.current_state] = ( 

301 self._retry_counts.get(self.current_state, 0) + 1 

302 ) 

303 else: 

304 self._retry_counts.pop(self._prev_state, None) 

305 self._throttle_counts.pop(self._prev_state, None) 

306 

307 # Check terminal 

308 if state_config.terminal: 

309 # Handle maintain mode - restart loop instead of terminating 

310 if self.fsm.maintain: 

311 self.iteration += 1 

312 maintain_target = state_config.on_maintain or self.fsm.initial 

313 self._emit( 

314 "route", 

315 { 

316 "from": self.current_state, 

317 "to": maintain_target, 

318 "reason": "maintain", 

319 }, 

320 ) 

321 self._prev_state = self.current_state 

322 self.current_state = maintain_target 

323 self._just_routed = True 

324 continue 

325 return self._finish("terminal") 

326 

327 # Check per-state retry limit. If the consecutive re-entry count exceeds 

328 # max_retries, skip execution and route to on_retry_exhausted instead. 

329 if state_config.max_retries is not None: 

330 retry_count = self._retry_counts.get(self.current_state, 0) 

331 if retry_count > state_config.max_retries: 

332 # on_retry_exhausted is guaranteed non-None by validation when 

333 # max_retries is set, but we fall back to an error if misconfigured. 

334 exhausted_state: str = state_config.on_retry_exhausted or "" 

335 if not exhausted_state: 

336 return self._finish( 

337 "error", 

338 error=f"State '{self.current_state}' exceeded max_retries " 

339 "but on_retry_exhausted is not set", 

340 ) 

341 self._emit( 

342 "retry_exhausted", 

343 { 

344 "state": self.current_state, 

345 "retries": retry_count, 

346 "next": exhausted_state, 

347 }, 

348 ) 

349 self._retry_counts.pop(self.current_state, None) 

350 self._prev_state = self.current_state 

351 self.current_state = exhausted_state 

352 continue 

353 

354 self.iteration += 1 

355 self._just_routed = False 

356 self._emit( 

357 "state_enter", 

358 { 

359 "state": self.current_state, 

360 "iteration": self.iteration, 

361 }, 

362 ) 

363 

364 # Execute state 

365 next_state = self._execute_state(state_config) 

366 

367 # Check for pending error signal (FATAL_ERROR) 

368 if self._pending_error is not None: 

369 return self._finish("error", error=self._pending_error) 

370 

371 # Check for pending handoff signal 

372 if self._pending_handoff: 

373 return self._handle_handoff(self._pending_handoff) 

374 

375 # Handle maintain mode 

376 if next_state is None and self.fsm.maintain: 

377 next_state = state_config.on_maintain or self.fsm.initial 

378 

379 # SIGKILL in _execute_state sets shutdown flag and returns None 

380 if next_state is None and self._shutdown_requested: 

381 return self._finish("signal") 

382 

383 if next_state is None: 

384 return self._finish("error", error="No valid transition") 

385 

386 # At this point next_state is guaranteed to be str 

387 resolved_next: str = next_state 

388 

389 self._emit( 

390 "route", 

391 { 

392 "from": self.current_state, 

393 "to": resolved_next, 

394 }, 

395 ) 

396 

397 # Per-edge revisit tracking for cycle detection. 

398 edge_key = f"{self.current_state}->{resolved_next}" 

399 self._edge_revisit_counts[edge_key] = self._edge_revisit_counts.get(edge_key, 0) + 1 

400 if self._edge_revisit_counts[edge_key] > self.fsm.max_edge_revisits: 

401 self._emit( 

402 "cycle_detected", 

403 { 

404 "edge": edge_key, 

405 "from": self.current_state, 

406 "to": resolved_next, 

407 "count": self._edge_revisit_counts[edge_key], 

408 "max": self.fsm.max_edge_revisits, 

409 }, 

410 ) 

411 return self._finish( 

412 "cycle_detected", 

413 error=f"Cycle detected: edge {edge_key} traversed " 

414 f"{self._edge_revisit_counts[edge_key]} times " 

415 f"(limit: {self.fsm.max_edge_revisits})", 

416 ) 

417 

418 self._prev_state = self.current_state 

419 self.current_state = resolved_next 

420 self._just_routed = True 

421 

422 # Interruptible backoff sleep between iterations 

423 if self.fsm.backoff and self.fsm.backoff > 0: 

424 deadline = time.time() + self.fsm.backoff 

425 while time.time() < deadline: 

426 if self._shutdown_requested: 

427 break 

428 time.sleep(min(0.1, deadline - time.time())) 

429 

430 except InterpolationError as exc: 

431 return self._finish( 

432 "error", 

433 error=( 

434 f"Missing context variable in state '{self.current_state}': {exc}. " 

435 f"Run with: ll-loop run {self.fsm.name} --context KEY=VALUE" 

436 ), 

437 ) 

438 except Exception as exc: 

439 return self._finish("error", error=str(exc)) 

440 

441 def _execute_sub_loop(self, state: StateConfig, ctx: InterpolationContext) -> str | None: 

442 """Execute a sub-loop state by loading and running a child FSM. 

443 

444 Args: 

445 state: The state configuration with loop field set 

446 ctx: Interpolation context for routing 

447 

448 Returns: 

449 Next state name based on child loop verdict, or None 

450 """ 

451 from little_loops.cli.loop._helpers import resolve_loop_path 

452 from little_loops.fsm.validation import load_and_validate 

453 

454 assert state.loop is not None # guarded by caller 

455 loop_name = interpolate(state.loop, ctx) 

456 loop_path = resolve_loop_path(loop_name, self.loops_dir or Path(".loops")) 

457 child_fsm, _ = load_and_validate(loop_path) 

458 

459 # Bind child context: explicit with: bindings take precedence over legacy passthrough 

460 if state.with_: 

461 from little_loops.fsm.interpolation import interpolate_dict 

462 

463 resolved = interpolate_dict(state.with_, ctx) 

464 # Apply declared defaults for unbound optional parameters 

465 for param_name, param_spec in child_fsm.parameters.items(): 

466 if ( 

467 param_name not in resolved 

468 and not param_spec.required 

469 and param_spec.default is not None 

470 ): 

471 resolved[param_name] = param_spec.default 

472 # Runtime check: required parameters must be present after interpolation 

473 for param_name, param_spec in child_fsm.parameters.items(): 

474 if param_spec.required and param_name not in resolved: 

475 raise ValueError( 

476 f"Sub-loop '{state.loop}' requires parameter '{param_name}' " 

477 f"but it is not bound in 'with'" 

478 ) 

479 # Merge: child's own context block provides base; with: bindings override 

480 child_fsm.context = {**child_fsm.context, **resolved} 

481 elif state.context_passthrough: 

482 # Extract .output strings from capture result dicts so ${context.key} resolves 

483 # to the plain output string (e.g. "ENH-123") rather than the full capture object. 

484 captured_as_context = { 

485 k: v["output"] if isinstance(v, dict) and "exit_code" in v else v 

486 for k, v in self.captured.items() 

487 } 

488 child_fsm.context = {**self.fsm.context, **captured_as_context, **child_fsm.context} 

489 

490 depth = self._depth + 1 

491 child_events: list[dict] = [] 

492 

493 def _sub_event_callback(event: dict) -> None: 

494 child_events.append(event) 

495 # Only inject depth if not already set by a deeper nested sub-loop 

496 if "depth" not in event: 

497 self.event_callback({**event, "depth": depth}) 

498 else: 

499 self.event_callback(event) 

500 

501 child_executor = FSMExecutor( 

502 child_fsm, 

503 action_runner=self.action_runner, 

504 loops_dir=self.loops_dir, 

505 event_callback=_sub_event_callback, 

506 circuit=self._circuit, 

507 ) 

508 child_executor._depth = depth # propagate depth for further nesting 

509 

510 # Clamp child timeout to parent's remaining wall-clock budget so a slow sub-loop 

511 # can't silently consume the parent's deadline with no recourse for the parent FSM. 

512 if self.fsm.timeout: 

513 elapsed_ms = _now_ms() - self.start_time_ms + self.elapsed_offset_ms 

514 remaining_s = max(1, int((self.fsm.timeout * 1000 - elapsed_ms) // 1000)) 

515 if child_fsm.timeout is None or child_fsm.timeout > remaining_s: 

516 child_fsm.timeout = remaining_s 

517 

518 child_result = child_executor.run() 

519 

520 # Capture child event stream as a JSON-lines string if the state declares a capture key 

521 if state.capture and child_events: 

522 import json as _json 

523 

524 self.captured[state.capture] = { 

525 "output": "\n".join(_json.dumps(e) for e in child_events), 

526 "exit_code": None, 

527 } 

528 

529 # Merge child captures back into parent under the state name 

530 if (state.context_passthrough or state.with_) and child_executor.captured: 

531 self.captured[self.current_state] = child_executor.captured 

532 

533 # Route based on child termination reason and terminal state name 

534 if child_result.terminated_by == "terminal": 

535 if child_result.final_state == "done": 

536 return interpolate(state.on_yes, ctx) if state.on_yes else None 

537 else: 

538 # Reached a non-done terminal (e.g. "failed") → failure 

539 return interpolate(state.on_no, ctx) if state.on_no else None 

540 elif child_result.terminated_by == "error": 

541 # Runtime child failure (not a YAML load error) 

542 if state.on_error: 

543 return interpolate(state.on_error, ctx) 

544 return interpolate(state.on_no, ctx) if state.on_no else None 

545 else: 

546 # max_iterations, timeout, signal — all are failure 

547 return interpolate(state.on_no, ctx) if state.on_no else None 

548 

549 def _execute_learning_state(self, state: StateConfig, ctx: InterpolationContext) -> str | None: 

550 """Execute a FEAT-1283 ``type: learning`` state. 

551 

552 Iterates ``state.learning.targets`` in order. For each target: 

553 1. Look up its record in the learning-tests registry (ENH-1282). 

554 2. If proven → continue. 

555 3. If refuted → emit ``learning_target_refuted`` + ``learning_blocked`` 

556 and route to ``on_blocked`` (preferred) or ``on_no``. 

557 4. If missing or stale → emit ``learning_target_stale`` and invoke 

558 ``/ll:explore-api <target>`` via the executor's action_runner; 

559 re-check the registry; repeat up to ``max_retries`` times before 

560 emitting ``learning_blocked`` (reason ``retries_exhausted``) and 

561 routing to ``on_blocked``/``on_no``. 

562 

563 When every target ends up proven, emit ``learning_complete`` and route 

564 to ``on_yes``. Returns the resolved next-state name (or ``None`` when no 

565 route is configured for the terminal verdict, mirroring ``_route``). 

566 """ 

567 from little_loops.learning_tests import check_learning_test 

568 

569 assert state.learning is not None # guarded by caller 

570 

571 def _blocked_target(reason: str, target: str) -> str | None: 

572 self._emit( 

573 "learning_blocked", 

574 {"state": self.current_state, "target": target, "reason": reason}, 

575 ) 

576 route = state.on_blocked or state.on_no 

577 return interpolate(route, ctx) if route else None 

578 

579 for target in state.learning.targets: 

580 record = check_learning_test(target) 

581 

582 attempts = 0 

583 while record is None or record.status == "stale": 

584 if attempts >= state.learning.max_retries: 

585 return _blocked_target("retries_exhausted", target) 

586 

587 if record is None: 

588 self._emit( 

589 "learning_target_stale", 

590 {"state": self.current_state, "target": target, "cause": "missing"}, 

591 ) 

592 else: 

593 self._emit( 

594 "learning_target_stale", 

595 {"state": self.current_state, "target": target, "cause": "stale"}, 

596 ) 

597 

598 self._emit( 

599 "learning_explore_invoked", 

600 {"state": self.current_state, "target": target, "attempt": attempts + 1}, 

601 ) 

602 self._run_action(f"/ll:explore-api {target}", state, ctx) 

603 attempts += 1 

604 record = check_learning_test(target) 

605 

606 if record.status == "refuted": 

607 self._emit( 

608 "learning_target_refuted", 

609 {"state": self.current_state, "target": target}, 

610 ) 

611 return _blocked_target("refuted", target) 

612 

613 self._emit( 

614 "learning_target_proven", 

615 {"state": self.current_state, "target": target}, 

616 ) 

617 

618 self._emit( 

619 "learning_complete", 

620 {"state": self.current_state, "targets": list(state.learning.targets)}, 

621 ) 

622 return interpolate(state.on_yes, ctx) if state.on_yes else None 

623 

624 def _check_throttle(self, state: StateConfig, state_name: str) -> str | None: 

625 """Increment the per-state tool-call counter and enforce throttle thresholds. 

626 

627 Called after every action execution within a state visit. Returns the forced 

628 next-state name when the hard threshold is reached, or None when execution 

629 should continue normally (warn events are emitted but do not redirect). 

630 

631 Sets self._pending_error and returns "__STOP__" when the call count exceeds 

632 hard_max with no on_throttle_hard target — the caller must propagate this as 

633 a None return from _execute_state so the main loop detects _pending_error. 

634 """ 

635 count = self._throttle_counts.get(state_name, 0) + 1 

636 self._throttle_counts[state_name] = count 

637 

638 throttle = state.throttle 

639 normal_max = ( 

640 throttle.normal_max 

641 if (throttle and throttle.normal_max is not None) 

642 else _DEFAULT_THROTTLE_NORMAL_MAX 

643 ) 

644 warn_max = ( 

645 throttle.warn_max 

646 if (throttle and throttle.warn_max is not None) 

647 else _DEFAULT_THROTTLE_WARN_MAX 

648 ) 

649 hard_max = ( 

650 throttle.hard_max 

651 if (throttle and throttle.hard_max is not None) 

652 else _DEFAULT_THROTTLE_HARD_MAX 

653 ) 

654 

655 if count == warn_max: 

656 self._emit( 

657 THROTTLE_WARN_EVENT, 

658 { 

659 "state": state_name, 

660 "count": count, 

661 "normal_max": normal_max, 

662 "warn_max": warn_max, 

663 "hard_max": hard_max, 

664 }, 

665 ) 

666 

667 # States with type="learning" (FEAT-1283) are exempt from hard_max — they 

668 # legitimately make N calls per visit (one per unproven target). 

669 if state.type == "learning": 

670 return None 

671 

672 if count == hard_max: 

673 next_target = state.on_throttle_hard or state.on_error 

674 self._emit( 

675 THROTTLE_HARD_EVENT, 

676 {"state": state_name, "count": count, "hard_max": hard_max, "next": next_target}, 

677 ) 

678 return next_target 

679 

680 if count > hard_max: 

681 self._emit( 

682 THROTTLE_STOP_EVENT, 

683 {"state": state_name, "count": count, "hard_max": hard_max}, 

684 ) 

685 self._pending_error = ( 

686 f"Throttle stop: state '{state_name}' exceeded hard_max={hard_max} " 

687 "tool calls with no on_throttle_hard target" 

688 ) 

689 return "__STOP__" 

690 

691 return None 

692 

693 def _execute_state(self, state: StateConfig) -> str | None: 

694 """Execute a single state and return next state name. 

695 

696 Args: 

697 state: The state configuration to execute 

698 

699 Returns: 

700 Next state name, or None if no valid transition 

701 """ 

702 # Build interpolation context 

703 ctx = self._build_context() 

704 

705 # Dispatch to sub-loop handler if this is a sub-loop state 

706 if state.loop is not None: 

707 try: 

708 return self._execute_sub_loop(state, ctx) 

709 except (FileNotFoundError, ValueError): 

710 if state.on_error: 

711 return interpolate(state.on_error, ctx) 

712 raise 

713 

714 # FEAT-1283: dispatch to learning-state handler when both type="learning" 

715 # AND a LearningConfig is present. The bare `type="learning"` marker 

716 # (used pre-FEAT-1283 only as a throttle hard_max exemption hint, see 

717 # ThrottleConfig docstring) falls through to normal action execution. 

718 if state.type == "learning" and state.learning is not None: 

719 return self._execute_learning_state(state, ctx) 

720 

721 # Handle unconditional transition 

722 if state.next: 

723 if state.action: 

724 self._maybe_wait_for_circuit(state) 

725 result, routed = self._run_action_or_route(state, ctx) 

726 if routed is not None: 

727 return routed 

728 throttle_next = self._check_throttle(state, self.current_state) 

729 if throttle_next == "__STOP__": 

730 return None 

731 if throttle_next is not None: 

732 return throttle_next 

733 assert result is not None 

734 self.prev_result = { 

735 "output": result.output, 

736 "exit_code": result.exit_code, 

737 "state": self.current_state, 

738 } 

739 if result.exit_code is not None and result.exit_code < 0: 

740 # Process killed by signal — do not silently advance via next 

741 if state.on_error: 

742 return interpolate(state.on_error, ctx) 

743 self.request_shutdown() 

744 return None 

745 # Non-zero exit: if on_error is defined, treat next as success path only 

746 if result.exit_code != 0 and state.on_error: 

747 return interpolate(state.on_error, ctx) 

748 return interpolate(state.next, ctx) 

749 

750 # Execute action if present 

751 action_result = None 

752 if state.action: 

753 self._maybe_wait_for_circuit(state) 

754 action_result, routed = self._run_action_or_route(state, ctx) 

755 if routed is not None: 

756 return routed 

757 throttle_next = self._check_throttle(state, self.current_state) 

758 if throttle_next == "__STOP__": 

759 return None 

760 if throttle_next is not None: 

761 return throttle_next 

762 

763 # Evaluate 

764 eval_result = self._evaluate(state, action_result, ctx) 

765 self.prev_result = { 

766 "output": action_result.output if action_result else "", 

767 "exit_code": action_result.exit_code if action_result else 0, 

768 "state": self.current_state, 

769 } 

770 

771 # Update context with result for routing interpolation 

772 if eval_result: 

773 ctx.result = { 

774 "verdict": eval_result.verdict, 

775 "details": eval_result.details, 

776 } 

777 

778 # Route based on verdict 

779 verdict = eval_result.verdict if eval_result else "yes" 

780 route_ctx = RouteContext( 

781 state_name=self.current_state, 

782 state=state, 

783 verdict=verdict, 

784 action_result=action_result, 

785 eval_result=eval_result, 

786 ctx=ctx, 

787 iteration=self.iteration, 

788 ) 

789 # 429 / rate-limit detection — runs before interceptors so an in-place retry 

790 # returns early without dispatching to registered before_route hooks. 

791 if action_result is not None: 

792 _combined = (action_result.output or "") + "\n" + (action_result.stderr or "") 

793 _failure_type, _reason = classify_failure(_combined, action_result.exit_code) 

794 if _failure_type == FailureType.TRANSIENT and ( 

795 "rate limit" in _reason.lower() or "quota" in _reason.lower() 

796 ): 

797 _handled, _target = self._handle_rate_limit(state, route_ctx.state_name) 

798 if _handled: 

799 return _target 

800 elif _failure_type == FailureType.TRANSIENT and "api server error" in _reason.lower(): 

801 _handled, _target = self._handle_api_error(state, route_ctx.state_name) 

802 if _handled: 

803 return _target 

804 # exhausted — fall through to normal verdict routing 

805 else: 

806 # Not rate-limited or server-error: reset counters so future transients start fresh. 

807 self._rate_limit_retries.pop(route_ctx.state_name, None) 

808 self._consecutive_rate_limit_exhaustions = 0 

809 self._api_error_retries.pop(route_ctx.state_name, None) 

810 

811 for interceptor in self._interceptors: 

812 if hasattr(interceptor, "before_route"): 

813 decision = interceptor.before_route(route_ctx) 

814 if isinstance(decision, RouteDecision): 

815 if decision.next_state is None: 

816 return None # veto 

817 return decision.next_state # redirect — bypass _route() 

818 next_state = self._route(state, verdict, ctx) 

819 for interceptor in self._interceptors: 

820 if hasattr(interceptor, "after_route"): 

821 interceptor.after_route(route_ctx) 

822 return next_state 

823 

824 def _run_action( 

825 self, 

826 action_template: str, 

827 state: StateConfig, 

828 ctx: InterpolationContext, 

829 ) -> ActionResult: 

830 """Execute action and optionally capture result. 

831 

832 Args: 

833 action_template: Action string (may contain variables) 

834 state: State configuration 

835 ctx: Interpolation context 

836 

837 Returns: 

838 ActionResult with output and exit code 

839 """ 

840 action = interpolate(action_template, ctx) 

841 action_mode = self._action_mode(state) 

842 

843 self._emit("action_start", {"action": action, "is_prompt": action_mode == "prompt"}) 

844 

845 def _on_line(line: str) -> None: 

846 self._emit("action_output", {"line": line}) 

847 

848 if action_mode == "mcp_tool": 

849 # Direct MCP tool call — bypass action_runner entirely 

850 interpolated_params = interpolate_dict(state.params, ctx) if state.params else {} 

851 cmd = ["mcp-call", action, json.dumps(interpolated_params)] 

852 result = self._run_subprocess( 

853 cmd, 

854 timeout=state.timeout or self.fsm.default_timeout or 30, 

855 on_output_line=_on_line, 

856 ) 

857 elif action_mode == "contributed": 

858 assert ( 

859 state.action_type is not None 

860 ) # guaranteed by _action_mode returning "contributed" 

861 runner = self._contributed_actions[state.action_type] 

862 result = runner.run( 

863 action, 

864 timeout=state.timeout or self.fsm.default_timeout or 3600, 

865 is_slash_command=False, 

866 on_output_line=_on_line, 

867 ) 

868 else: 

869 result = self.action_runner.run( 

870 action, 

871 timeout=state.timeout or self.fsm.default_timeout or 3600, 

872 is_slash_command=action_mode == "prompt", 

873 on_output_line=_on_line, 

874 agent=state.agent if action_mode == "prompt" else None, 

875 tools=state.tools if action_mode == "prompt" else None, 

876 ) 

877 

878 preview = result.output[-2000:].strip() if result.output else None 

879 payload: dict[str, Any] = { 

880 "exit_code": result.exit_code, 

881 "duration_ms": result.duration_ms, 

882 "output_preview": preview, 

883 "is_prompt": action_mode == "prompt", 

884 } 

885 if action_mode == "prompt": 

886 session_jsonl = get_current_session_jsonl() 

887 payload["session_jsonl"] = str(session_jsonl) if session_jsonl else None 

888 self._emit("action_complete", payload) 

889 

890 # Capture if requested 

891 if state.capture: 

892 self.captured[state.capture] = { 

893 "output": result.output.rstrip("\n\r"), 

894 "stderr": result.stderr, 

895 "exit_code": result.exit_code, 

896 "duration_ms": result.duration_ms, 

897 } 

898 

899 # Check for signals in output 

900 if self.signal_detector: 

901 signal = self.signal_detector.detect_first(result.output) 

902 if signal: 

903 if signal.signal_type == "handoff": 

904 self._pending_handoff = signal 

905 elif signal.signal_type == "error": 

906 self._pending_error = signal.payload 

907 elif signal.signal_type == "stop": 

908 self.request_shutdown() 

909 

910 return result 

911 

912 def _run_subprocess( 

913 self, 

914 cmd: list[str], 

915 timeout: int, 

916 on_output_line: Any | None = None, 

917 ) -> ActionResult: 

918 """Run a subprocess directly and return ActionResult. 

919 

920 Follows the same Popen + stderr-drain-thread pattern as DefaultActionRunner. 

921 

922 Args: 

923 cmd: Command and arguments to execute 

924 timeout: Timeout in seconds 

925 on_output_line: Optional callback for each stdout line 

926 

927 Returns: 

928 ActionResult with output, stderr, exit_code, duration_ms 

929 """ 

930 start = _now_ms() 

931 process = subprocess.Popen( 

932 cmd, 

933 stdout=subprocess.PIPE, 

934 stderr=subprocess.PIPE, 

935 text=True, 

936 ) 

937 self._current_process = process 

938 output_chunks: list[str] = [] 

939 stderr_chunks: list[str] = [] 

940 

941 def _drain_stderr() -> None: 

942 assert process.stderr is not None 

943 for line in process.stderr: 

944 stderr_chunks.append(line) 

945 

946 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True) 

947 stderr_thread.start() 

948 

949 try: 

950 for line in process.stdout: # type: ignore[union-attr] 

951 output_chunks.append(line) 

952 if on_output_line: 

953 on_output_line(line.rstrip()) 

954 process.wait(timeout=timeout) 

955 except subprocess.TimeoutExpired: 

956 process.kill() 

957 process.wait() 

958 stderr_thread.join(timeout=5) 

959 return ActionResult( 

960 output="".join(output_chunks), 

961 stderr="".join(stderr_chunks) or "MCP call timed out", 

962 exit_code=124, 

963 duration_ms=timeout * 1000, 

964 ) 

965 finally: 

966 self._current_process = None 

967 stderr_thread.join(timeout=5) 

968 return ActionResult( 

969 output="".join(output_chunks), 

970 stderr="".join(stderr_chunks), 

971 exit_code=process.returncode, 

972 duration_ms=_now_ms() - start, 

973 ) 

974 

975 def _evaluate( 

976 self, 

977 state: StateConfig, 

978 action_result: ActionResult | None, 

979 ctx: InterpolationContext, 

980 ) -> EvaluationResult | None: 

981 """Evaluate action result. 

982 

983 Args: 

984 state: State configuration 

985 action_result: Result from action execution (may be None) 

986 ctx: Interpolation context 

987 

988 Returns: 

989 EvaluationResult, or None if no evaluation needed 

990 """ 

991 if state.evaluate is None: 

992 # Default evaluation based on action type 

993 if action_result: 

994 action_mode = self._action_mode(state) 

995 

996 if action_mode == "mcp_tool": 

997 # MCP tool call: use mcp_result evaluator 

998 result = evaluate_mcp_result(action_result.output, action_result.exit_code) 

999 elif action_mode == "prompt": 

1000 # Slash command or prompt: use LLM evaluation 

1001 if not self.fsm.llm.enabled: 

1002 result = EvaluationResult( 

1003 verdict="error", 

1004 details={"error": "LLM evaluation disabled via --no-llm"}, 

1005 ) 

1006 else: 

1007 result = evaluate_llm_structured( 

1008 action_result.output, 

1009 model=self.fsm.llm.model, 

1010 max_tokens=self.fsm.llm.max_tokens, 

1011 timeout=self.fsm.llm.timeout, 

1012 ) 

1013 else: 

1014 # Shell command: use exit code 

1015 result = evaluate_exit_code(action_result.exit_code) 

1016 

1017 self._emit( 

1018 "evaluate", 

1019 { 

1020 "type": "default", 

1021 "verdict": result.verdict, 

1022 **result.details, 

1023 }, 

1024 ) 

1025 return result 

1026 return None 

1027 

1028 # Explicit evaluation config 

1029 raw_output = action_result.output if action_result else "" 

1030 if state.evaluate.source: 

1031 try: 

1032 eval_input = interpolate(state.evaluate.source, ctx) 

1033 except InterpolationError: 

1034 eval_input = raw_output 

1035 else: 

1036 eval_input = raw_output 

1037 

1038 if state.evaluate.type in self._contributed_evaluators: 

1039 result = self._contributed_evaluators[state.evaluate.type]( 

1040 state.evaluate, 

1041 eval_input, 

1042 action_result.exit_code if action_result else 0, 

1043 ctx, 

1044 ) 

1045 elif state.evaluate.type == "llm_structured" and not self.fsm.llm.enabled: 

1046 result = EvaluationResult( 

1047 verdict="error", 

1048 details={"error": "LLM evaluation disabled via --no-llm"}, 

1049 ) 

1050 else: 

1051 result = evaluate( 

1052 config=state.evaluate, 

1053 output=eval_input, 

1054 exit_code=action_result.exit_code if action_result else 0, 

1055 context=ctx, 

1056 ) 

1057 

1058 self._emit( 

1059 "evaluate", 

1060 { 

1061 "type": state.evaluate.type, 

1062 "verdict": result.verdict, 

1063 **result.details, 

1064 }, 

1065 ) 

1066 

1067 return result 

1068 

1069 def _route( 

1070 self, 

1071 state: StateConfig, 

1072 verdict: str, 

1073 ctx: InterpolationContext, 

1074 ) -> str | None: 

1075 """Determine next state from verdict. 

1076 

1077 Resolution order (from design doc): 

1078 1. next (unconditional) - handled before this method 

1079 2. route (full routing table) 

1080 3. on_success/on_failure/on_error (shorthand) 

1081 4. terminal - handled in main loop 

1082 5. error 

1083 

1084 Args: 

1085 state: State configuration 

1086 verdict: Verdict string from evaluation 

1087 ctx: Interpolation context 

1088 

1089 Returns: 

1090 Next state name, or None if no valid route 

1091 """ 

1092 if state.route: 

1093 routes = state.route.routes 

1094 if verdict in routes: 

1095 return self._resolve_route(routes[verdict], ctx) 

1096 if state.route.default: 

1097 return self._resolve_route(state.route.default, ctx) 

1098 if verdict == "error" and state.route.error: 

1099 return self._resolve_route(state.route.error, ctx) 

1100 return None 

1101 

1102 # Shorthand routing 

1103 if verdict == "yes" and state.on_yes: 

1104 return self._resolve_route(state.on_yes, ctx) 

1105 if verdict == "no" and state.on_no: 

1106 return self._resolve_route(state.on_no, ctx) 

1107 if verdict == "error" and state.on_error: 

1108 return self._resolve_route(state.on_error, ctx) 

1109 if verdict == "partial" and state.on_partial: 

1110 return self._resolve_route(state.on_partial, ctx) 

1111 if verdict == "blocked" and state.on_blocked: 

1112 return self._resolve_route(state.on_blocked, ctx) 

1113 

1114 # Dynamic on_<verdict> shorthands from extra_routes 

1115 if verdict in state.extra_routes: 

1116 return self._resolve_route(state.extra_routes[verdict], ctx) 

1117 

1118 return None 

1119 

1120 def _resolve_route(self, route: str, ctx: InterpolationContext) -> str: 

1121 """Resolve route target, handling special tokens. 

1122 

1123 Args: 

1124 route: Route target string 

1125 ctx: Interpolation context 

1126 

1127 Returns: 

1128 Resolved state name 

1129 """ 

1130 if route == "$current": 

1131 return self.current_state 

1132 return interpolate(route, ctx) 

1133 

1134 def _flush_pending_shell_state(self, state: StateConfig) -> None: 

1135 """Execute a pending shell-action state's action before honoring a 

1136 wall-clock timeout. BUG-1226: closes the narrow race between emitting 

1137 a `route` event and `state_enter` so handshake states (e.g. autodev's 

1138 ``copy_broke_down``) do not silently drop their side effect when the 

1139 timeout fires in that window. Single-step: we run the action but do 

1140 not follow its routing — ``final_state`` stays as the flushed state. 

1141 """ 

1142 assert state.action is not None # guarded by caller 

1143 self.iteration += 1 

1144 self._just_routed = False 

1145 self._emit( 

1146 "state_enter", 

1147 { 

1148 "state": self.current_state, 

1149 "iteration": self.iteration, 

1150 "flushed": True, 

1151 }, 

1152 ) 

1153 ctx = self._build_context() 

1154 try: 

1155 self._run_action(state.action, state, ctx) 

1156 except Exception: 

1157 # Deliberately swallow — the timeout is being honored regardless 

1158 # of whether the flushed action succeeded. 

1159 pass 

1160 

1161 def _action_mode(self, state: StateConfig) -> str: 

1162 """Return execution mode for the state: 'prompt', 'shell', or 'mcp_tool'.""" 

1163 if state.action_type == "mcp_tool": 

1164 return "mcp_tool" 

1165 if state.action_type in ("prompt", "slash_command"): 

1166 return "prompt" 

1167 if state.action_type == "shell": 

1168 return "shell" 

1169 if state.action_type in self._contributed_actions: 

1170 return "contributed" 

1171 # Heuristic: / prefix = slash_command (prompt mode) 

1172 if state.action is not None and state.action.startswith("/"): 

1173 return "prompt" 

1174 return "shell" 

1175 

1176 def _run_action_or_route( 

1177 self, state: StateConfig, ctx: InterpolationContext 

1178 ) -> tuple[ActionResult | None, str | None]: 

1179 """Run the state action, routing unhandled exceptions to on_error. 

1180 

1181 Returns (action_result, routed_target). ``routed_target`` is a non-None 

1182 next-state string only when an exception was raised AND ``state.on_error`` 

1183 is defined; in that case ``action_result`` is None. When no on_error is 

1184 set, the exception is re-raised for the top-level ``run()`` handler. 

1185 """ 

1186 assert state.action is not None # caller-guarded 

1187 try: 

1188 return self._run_action(state.action, state, ctx), None 

1189 except Exception as exc: 

1190 if state.on_error: 

1191 self._emit( 

1192 "action_error", 

1193 { 

1194 "state": self.current_state, 

1195 "error": str(exc), 

1196 "route": "on_error", 

1197 }, 

1198 ) 

1199 return None, interpolate(state.on_error, ctx) 

1200 raise 

1201 

1202 def _build_context(self) -> InterpolationContext: 

1203 """Build interpolation context for current state. 

1204 

1205 Returns: 

1206 InterpolationContext with all runtime values 

1207 """ 

1208 return InterpolationContext( 

1209 context=self.fsm.context, 

1210 captured=self.captured, 

1211 prev=self.prev_result, 

1212 result=None, 

1213 state_name=self.current_state, 

1214 iteration=self.iteration, 

1215 loop_name=self.fsm.name, 

1216 started_at=self.started_at, 

1217 elapsed_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms, 

1218 ) 

1219 

1220 def _emit(self, event: str, data: dict[str, Any]) -> None: 

1221 """Emit an event via the callback.""" 

1222 self.event_callback( 

1223 { 

1224 "event": event, 

1225 "ts": _iso_now(), 

1226 **data, 

1227 } 

1228 ) 

1229 

1230 def _handle_rate_limit(self, state: StateConfig, state_name: str) -> tuple[bool, str | None]: 

1231 """Handle a detected 429/rate-limit action outcome. 

1232 

1233 Implements the two-tier retry ladder: 

1234 1. Short-burst tier: up to ``max_rate_limit_retries`` attempts with 

1235 exponential backoff (``rate_limit_backoff_base_seconds * 2^n + jitter``). 

1236 2. Long-wait tier: walks ``rate_limit_long_wait_ladder`` with index capped 

1237 at the last entry, accumulating ``total_wait_seconds``. 

1238 

1239 Routes to ``on_rate_limit_exhausted`` (falling back to ``on_error``) only 

1240 once ``total_wait_seconds >= rate_limit_max_wait_seconds``. Emits 

1241 ``rate_limit_exhausted`` on routing (including tier counters) and 

1242 ``rate_limit_storm`` when consecutive exhaustions reach the threshold. 

1243 

1244 Returns: 

1245 (handled, target). ``handled=True`` means the caller should return 

1246 ``target`` directly (in-place retry uses ``state_name``; exhaustion 

1247 uses the routed target). ``handled=False`` should not occur for the 

1248 current rate-limit classification path but is reserved for future 

1249 extensions. 

1250 """ 

1251 _short_max = ( 

1252 state.max_rate_limit_retries 

1253 if state.max_rate_limit_retries is not None 

1254 else _DEFAULT_RATE_LIMIT_RETRIES 

1255 ) 

1256 _backoff_base = ( 

1257 state.rate_limit_backoff_base_seconds 

1258 if state.rate_limit_backoff_base_seconds is not None 

1259 else _DEFAULT_RATE_LIMIT_BACKOFF_BASE 

1260 ) 

1261 _max_wait = ( 

1262 state.rate_limit_max_wait_seconds 

1263 if state.rate_limit_max_wait_seconds is not None 

1264 else _DEFAULT_RATE_LIMIT_MAX_WAIT_SECONDS 

1265 ) 

1266 _ladder = ( 

1267 state.rate_limit_long_wait_ladder 

1268 if state.rate_limit_long_wait_ladder is not None 

1269 else _DEFAULT_RATE_LIMIT_LONG_WAIT_LADDER 

1270 ) 

1271 

1272 record = self._rate_limit_retries.get(state_name) 

1273 if record is None: 

1274 record = { 

1275 "short_retries": 0, 

1276 "long_retries": 0, 

1277 "total_wait_seconds": 0.0, 

1278 "first_seen_at": time.time(), 

1279 } 

1280 self._rate_limit_retries[state_name] = record 

1281 

1282 short_retries = int(record.get("short_retries", 0)) 

1283 long_retries = int(record.get("long_retries", 0)) 

1284 total_wait = float(record.get("total_wait_seconds", 0.0)) 

1285 

1286 if short_retries < _short_max: 

1287 # Short-burst tier — exponential backoff with jitter. Budget is not 

1288 # checked here; short-tier always advances to long-wait on exhaustion. 

1289 short_retries += 1 

1290 record["short_retries"] = short_retries 

1291 _sleep = _backoff_base * (2 ** (short_retries - 1)) + random.uniform(0, _backoff_base) 

1292 if self._circuit is not None: 

1293 self._circuit.record_rate_limit(_sleep) 

1294 total_wait += self._interruptible_sleep(_sleep) 

1295 record["total_wait_seconds"] = total_wait 

1296 return True, state_name # retry in place 

1297 

1298 # Long-wait tier — walk ladder with capped index. 

1299 long_retries += 1 

1300 record["long_retries"] = long_retries 

1301 _idx = min(long_retries - 1, len(_ladder) - 1) 

1302 _wait = float(_ladder[_idx]) 

1303 if self._circuit is not None: 

1304 self._circuit.record_rate_limit(_wait) 

1305 _tier_start = time.time() 

1306 _deadline = _tier_start + _wait 

1307 _total_wait_before_tier = total_wait 

1308 total_wait += self._interruptible_sleep( 

1309 _wait, 

1310 on_heartbeat=lambda elapsed: self._emit( 

1311 RATE_LIMIT_WAITING_EVENT, 

1312 { 

1313 "state": state_name, 

1314 "elapsed_seconds": elapsed, 

1315 "next_attempt_at": _deadline, 

1316 "total_waited_seconds": _total_wait_before_tier + elapsed, 

1317 "budget_seconds": _max_wait, 

1318 "tier": "long_wait", 

1319 }, 

1320 ), 

1321 ) 

1322 record["total_wait_seconds"] = total_wait 

1323 if total_wait >= _max_wait: 

1324 return True, self._exhaust_rate_limit(state, state_name, record) 

1325 return True, state_name # retry in place 

1326 

1327 def _maybe_wait_for_circuit(self, state: StateConfig) -> None: 

1328 """Pre-action circuit-breaker check: sleep until shared 429 recovery. 

1329 

1330 Skips quietly when no circuit is injected, when the state's action is not 

1331 an LLM-quota consumer, or when the circuit has no active recovery window. 

1332 """ 

1333 if self._circuit is None: 

1334 return 

1335 if self._action_mode(state) != "prompt": 

1336 return 

1337 recovery = self._circuit.get_estimated_recovery() 

1338 if recovery is None: 

1339 return 

1340 wait = recovery - time.time() 

1341 if wait > 0: 

1342 self._interruptible_sleep(wait) 

1343 

1344 def _interruptible_sleep( 

1345 self, 

1346 duration: float, 

1347 on_heartbeat: Callable[[float], None] | None = None, 

1348 ) -> float: 

1349 """Sleep for up to ``duration`` seconds in 100ms ticks, exiting promptly 

1350 on ``_shutdown_requested``. Returns the actual elapsed seconds so callers 

1351 can accumulate wall-clock time spent in rate-limit waits. 

1352 

1353 If ``on_heartbeat`` is provided, it is invoked with the elapsed seconds 

1354 roughly every ``_RATE_LIMIT_HEARTBEAT_INTERVAL`` seconds so UIs can show 

1355 live progress during long waits. The short-tier call site intentionally 

1356 omits the callback to preserve backward-compatible silent behavior. 

1357 """ 

1358 if duration <= 0: 

1359 return 0.0 

1360 _start = time.time() 

1361 _deadline = _start + duration 

1362 last_heartbeat = _start 

1363 while time.time() < _deadline: 

1364 if self._shutdown_requested: 

1365 break 

1366 time.sleep(min(0.1, _deadline - time.time())) 

1367 if on_heartbeat is not None: 

1368 _now = time.time() 

1369 if _now - last_heartbeat >= _RATE_LIMIT_HEARTBEAT_INTERVAL: 

1370 on_heartbeat(_now - _start) 

1371 last_heartbeat = _now 

1372 return time.time() - _start 

1373 

1374 def _exhaust_rate_limit( 

1375 self, state: StateConfig, state_name: str, record: dict[str, Any] 

1376 ) -> str | None: 

1377 """Finalize rate-limit exhaustion: emit event, storm detection, and 

1378 return the routed target. Pops the per-state record and is called only 

1379 once the wall-clock budget is spent. 

1380 """ 

1381 self._rate_limit_retries.pop(state_name, None) 

1382 target = state.on_rate_limit_exhausted or state.on_error 

1383 self._emit( 

1384 RATE_LIMIT_EXHAUSTED_EVENT, 

1385 { 

1386 "state": state_name, 

1387 "retries": int(record.get("short_retries", 0)) + int(record.get("long_retries", 0)), 

1388 "short_retries": int(record.get("short_retries", 0)), 

1389 "long_retries": int(record.get("long_retries", 0)), 

1390 "total_wait_seconds": float(record.get("total_wait_seconds", 0.0)), 

1391 "next": target, 

1392 }, 

1393 ) 

1394 self._consecutive_rate_limit_exhaustions += 1 

1395 if self._consecutive_rate_limit_exhaustions >= _RATE_LIMIT_STORM_THRESHOLD: 

1396 self._emit( 

1397 RATE_LIMIT_STORM_EVENT, 

1398 { 

1399 "state": state_name, 

1400 "count": self._consecutive_rate_limit_exhaustions, 

1401 }, 

1402 ) 

1403 return target 

1404 

1405 def _handle_api_error(self, state: StateConfig, state_name: str) -> tuple[bool, str | None]: 

1406 """Handle a detected API server error with short-burst flat backoff. 

1407 

1408 Unlike ``_handle_rate_limit``, uses a flat backoff with no long-wait tier and 

1409 falls through to normal FSM routing after ``_DEFAULT_API_ERROR_RETRIES`` attempts 

1410 so transient infrastructure hiccups don't permanently misdirect the loop. 

1411 

1412 Returns: 

1413 ``(True, state_name)`` to retry the state in place, or 

1414 ``(False, None)`` when the retry budget is exhausted (caller falls 

1415 through to normal verdict routing). 

1416 """ 

1417 record = self._api_error_retries.setdefault(state_name, {"retries": 0, "total_wait": 0.0}) 

1418 if record["retries"] >= _DEFAULT_API_ERROR_RETRIES: 

1419 self._api_error_retries.pop(state_name, None) 

1420 self._emit("api_error_exhausted", {"state": state_name, "retries": record["retries"]}) 

1421 return False, None 

1422 record["retries"] += 1 

1423 slept = self._interruptible_sleep(_DEFAULT_API_ERROR_BACKOFF) 

1424 record["total_wait"] += slept 

1425 self._emit( 

1426 "api_error_retry", 

1427 { 

1428 "state": state_name, 

1429 "attempt": record["retries"], 

1430 "backoff": _DEFAULT_API_ERROR_BACKOFF, 

1431 }, 

1432 ) 

1433 return True, state_name 

1434 

1435 def _finish(self, terminated_by: str, error: str | None = None) -> ExecutionResult: 

1436 """Finalize execution and return result.""" 

1437 self._emit( 

1438 "loop_complete", 

1439 { 

1440 "final_state": self.current_state, 

1441 "iterations": self.iteration, 

1442 "terminated_by": terminated_by, 

1443 }, 

1444 ) 

1445 

1446 return ExecutionResult( 

1447 final_state=self.current_state, 

1448 iterations=self.iteration, 

1449 terminated_by=terminated_by, 

1450 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms, 

1451 captured=self.captured, 

1452 error=error, 

1453 ) 

1454 

1455 def _handle_handoff(self, signal: DetectedSignal) -> ExecutionResult: 

1456 """Handle a detected handoff signal. 

1457 

1458 Emits a handoff_detected event and optionally invokes the handoff handler. 

1459 

1460 Args: 

1461 signal: The detected handoff signal 

1462 

1463 Returns: 

1464 ExecutionResult with handoff information 

1465 """ 

1466 self._emit( 

1467 "handoff_detected", 

1468 { 

1469 "state": self.current_state, 

1470 "iteration": self.iteration, 

1471 "continuation": signal.payload, 

1472 }, 

1473 ) 

1474 

1475 # Invoke handler if configured 

1476 if self.handoff_handler: 

1477 result = self.handoff_handler.handle(self.fsm.name, signal.payload) 

1478 if result.spawned_process is not None: 

1479 self._emit( 

1480 "handoff_spawned", 

1481 { 

1482 "pid": result.spawned_process.pid, 

1483 "state": self.current_state, 

1484 }, 

1485 ) 

1486 

1487 return ExecutionResult( 

1488 final_state=self.current_state, 

1489 iterations=self.iteration, 

1490 terminated_by="handoff", 

1491 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms, 

1492 captured=self.captured, 

1493 handoff=True, 

1494 continuation_prompt=signal.payload, 

1495 )