Coverage for little_loops / fsm / executor.py: 97%
576 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""FSM Executor - Runtime engine for FSM loop execution.
3This module provides the execution engine that runs FSM loops:
4- Executes actions (shell commands or slash commands)
5- Evaluates results using appropriate evaluators
6- Routes to next states based on verdicts
7- Tracks iteration count and enforces limits
8- Manages captured variables and context
9"""
11from __future__ import annotations
13import json
14import random
15import subprocess
16import threading
17import time
18from collections.abc import Callable
19from dataclasses import dataclass
20from datetime import UTC, datetime
21from pathlib import Path
22from typing import Any
24from little_loops.fsm.evaluators import (
25 EvaluationResult,
26 evaluate,
27 evaluate_exit_code,
28 evaluate_llm_structured,
29 evaluate_mcp_result,
30)
31from little_loops.fsm.handoff_handler import HandoffHandler
32from little_loops.fsm.interpolation import (
33 InterpolationContext,
34 InterpolationError,
35 interpolate,
36 interpolate_dict,
37)
38from little_loops.fsm.rate_limit_circuit import RateLimitCircuit
39from little_loops.fsm.runners import (
40 ActionRunner,
41 DefaultActionRunner,
42 SimulationActionRunner, # noqa: F401 — re-exported for backward compatibility
43 _now_ms,
44)
45from little_loops.fsm.schema import FSMLoop, StateConfig
46from little_loops.fsm.signal_detector import DetectedSignal, SignalDetector
47from little_loops.fsm.types import ActionResult, Evaluator, EventCallback, ExecutionResult
48from little_loops.issue_lifecycle import FailureType, classify_failure
49from little_loops.session_log import get_current_session_jsonl
51# Maximum number of per-state rate-limit retries before emitting rate_limit_exhausted.
52_DEFAULT_RATE_LIMIT_RETRIES: int = 3
53# Base backoff in seconds; actual sleep = base * 2^(attempt-1) + uniform(0, base).
54_DEFAULT_RATE_LIMIT_BACKOFF_BASE: int = 30
55# Total wall-clock budget (seconds) across short + long tiers before routing to
56# on_rate_limit_exhausted. Mirrors RateLimitsConfig.max_wait_seconds default (6h).
57_DEFAULT_RATE_LIMIT_MAX_WAIT_SECONDS: int = 21600
58# Long-wait tier ladder (seconds), walked once the short-tier budget is spent.
59# Mirrors RateLimitsConfig.long_wait_ladder default.
60_DEFAULT_RATE_LIMIT_LONG_WAIT_LADDER: list[int] = [300, 900, 1800, 3600]
61# Event name emitted when rate-limit retries are exhausted.
62RATE_LIMIT_EXHAUSTED_EVENT: str = "rate_limit_exhausted"
63# Event name emitted when consecutive rate-limit exhaustions reach the storm threshold.
64RATE_LIMIT_STORM_EVENT: str = "rate_limit_storm"
65# Event name emitted every ~60s during a long-wait rate-limit sleep so UIs can show live progress.
66RATE_LIMIT_WAITING_EVENT: str = "rate_limit_waiting"
67# Interval (seconds) between rate_limit_waiting heartbeat emissions during long-wait sleeps.
68_RATE_LIMIT_HEARTBEAT_INTERVAL: float = 60.0
69# Number of consecutive rate_limit_exhausted events that constitute a storm.
70_RATE_LIMIT_STORM_THRESHOLD: int = 3
71# Progressive throttle defaults: calls 1..normal_max pass through, at warn_max emit warning,
72# at hard_max route to on_throttle_hard, beyond hard_max hard-stop.
73_DEFAULT_THROTTLE_NORMAL_MAX: int = 3
74_DEFAULT_THROTTLE_WARN_MAX: int = 8
75_DEFAULT_THROTTLE_HARD_MAX: int = 12
76# Event names for progressive tool-call throttling within a single state visit.
77THROTTLE_WARN_EVENT: str = "throttle_warn"
78THROTTLE_HARD_EVENT: str = "throttle_hard"
79THROTTLE_STOP_EVENT: str = "throttle_stop"
80# Action types that consume LLM quota and are gated by the shared circuit breaker.
81# `_action_mode()` collapses both to "prompt"; the frozenset documents intent.
82LLM_ACTION_TYPES: frozenset[str] = frozenset({"slash_command", "prompt"})
83# Maximum per-state API server error retries before falling through to normal routing.
84_DEFAULT_API_ERROR_RETRIES: int = 2
85# Flat backoff in seconds between API server error retries (no exponential ladder).
86_DEFAULT_API_ERROR_BACKOFF: int = 30
89def _iso_now() -> str:
90 """Get current time as ISO 8601 string."""
91 return datetime.now(UTC).isoformat()
94@dataclass
95class RouteContext:
96 """Context passed to before_route / after_route interceptors."""
98 state_name: str
99 state: StateConfig
100 verdict: str
101 action_result: ActionResult | None
102 eval_result: EvaluationResult | None
103 ctx: InterpolationContext
104 iteration: int
107@dataclass
108class RouteDecision:
109 """Returned by before_route to redirect or veto a routing transition.
111 Return semantics for before_route:
112 None (implicit) → passthrough, routing proceeds normally
113 RouteDecision("state") → redirect, bypass _route() and use "state" directly
114 RouteDecision(None) → veto, _execute_state() returns None → _finish("error")
115 """
117 next_state: str | None # str → redirect; None → veto
120class FSMExecutor:
121 """Execute an FSM loop.
123 The executor runs an FSM from its initial state until:
124 - A terminal state is reached
125 - max_iterations is exceeded
126 - A timeout occurs
127 - A shutdown signal is received
128 - An unrecoverable error occurs
130 Events are emitted via the callback for observability.
131 """
133 def __init__(
134 self,
135 fsm: FSMLoop,
136 event_callback: EventCallback | None = None,
137 action_runner: ActionRunner | None = None,
138 signal_detector: SignalDetector | None = None,
139 handoff_handler: HandoffHandler | None = None,
140 loops_dir: Path | None = None,
141 circuit: RateLimitCircuit | None = None,
142 ):
143 """Initialize the executor.
145 Args:
146 fsm: The FSM loop to execute
147 event_callback: Optional callback for events
148 action_runner: Optional custom action runner (for testing)
149 signal_detector: Optional signal detector for output parsing
150 handoff_handler: Optional handler for handoff signals
151 loops_dir: Base directory for resolving sub-loop references
152 circuit: Optional shared rate-limit circuit breaker for 429 coordination
153 """
154 self.fsm = fsm
155 self.event_callback = event_callback or (lambda _: None)
156 self.action_runner: ActionRunner = action_runner or DefaultActionRunner()
157 self.signal_detector = signal_detector
158 self.handoff_handler = handoff_handler
159 self.loops_dir = loops_dir
160 self._circuit = circuit
162 # Runtime state
163 self.current_state = fsm.initial
164 self.iteration = 0
165 self.captured: dict[str, dict[str, Any]] = {}
166 self.prev_result: dict[str, Any] | None = None
167 self.started_at = ""
168 self.start_time_ms = 0
169 self.elapsed_offset_ms = (
170 0 # milliseconds from segments before current run (set by PersistentExecutor on resume)
171 )
173 # Shutdown flag for graceful signal handling
174 self._shutdown_requested = False
176 # Currently running MCP subprocess (set by _run_subprocess, cleared in finally).
177 # Enables external shutdown code to kill the process on SIGTERM.
178 self._current_process: subprocess.Popen[str] | None = None
180 # Pending handoff signal (set by _run_action, checked by main loop)
181 self._pending_handoff: DetectedSignal | None = None
183 # Pending error payload from FATAL_ERROR signal (set by _run_action, checked by main loop)
184 self._pending_error: str | None = None
186 # Per-state retry tracking for max_retries support.
187 # _retry_counts[state_name] = number of consecutive re-entries into that state.
188 # Incremented each time we enter the same state as the previous iteration.
189 # Reset when a different state is entered, or after retry exhaustion.
190 self._retry_counts: dict[str, int] = {}
191 # State entered in the previous iteration (None on first iteration or after resume).
192 self._prev_state: str | None = None
194 # BUG-1226: true between emitting a `route` event and entering the
195 # target state. Gates the "flush one pending shell state on timeout"
196 # behavior so we only flush when there is actually a pending state.
197 self._just_routed: bool = False
199 # Per-state rate-limit retry tracking (parallel to _retry_counts).
200 # _rate_limit_retries[state_name] = dict-of-record:
201 # {
202 # "short_retries": int, # attempts in short-burst tier
203 # "long_retries": int, # attempts in long-wait tier
204 # "total_wait_seconds": float,
205 # "first_seen_at": float | None, # epoch timestamp of first 429
206 # }
207 # Incremented inside _handle_rate_limit on each detected rate-limit response.
208 # Reset when the state completes without a rate-limit, or after exhaustion.
209 self._rate_limit_retries: dict[str, dict[str, Any]] = {}
211 # Consecutive rate_limit_exhausted emissions across all states. Reset on any
212 # successful non-rate-limited state transition. When this reaches
213 # _RATE_LIMIT_STORM_THRESHOLD, a RATE_LIMIT_STORM event is emitted.
214 self._consecutive_rate_limit_exhaustions: int = 0
216 # Per-state API server error retry tracking (parallel to _rate_limit_retries).
217 # _api_error_retries[state_name] = {"retries": int, "total_wait": float}
218 # Reset when the state completes without a server error, or after exhaustion.
219 self._api_error_retries: dict[str, dict[str, Any]] = {}
221 # Per-state tool-call throttle counter. Counts successive action executions within
222 # a single continuous state visit. Reset on state exit; NOT serialized to LoopState
223 # (throttle counts measure instantaneous visit-level activity, not cumulative retries).
224 self._throttle_counts: dict[str, int] = {}
226 # Per-edge revisit counter for cycle detection.
227 # _edge_revisit_counts["from_state->to_state"] = number of times that edge has fired.
228 # When any edge exceeds max_edge_revisits, the loop terminates with cycle_detected.
229 self._edge_revisit_counts: dict[str, int] = {}
231 # Nesting depth for sub-loop event forwarding (0 = top-level, 1+ = sub-loop).
232 # Set by the parent executor when constructing child executors.
233 self._depth: int = 0
235 # Extension hook registries — populated by wire_extensions()
236 self._contributed_actions: dict[str, ActionRunner] = {}
237 self._contributed_evaluators: dict[str, Evaluator] = {}
238 self._interceptors: list[Any] = []
240 def request_shutdown(self) -> None:
241 """Request graceful shutdown of the executor.
243 Sets a flag that will be checked at the start of each iteration,
244 allowing the loop to exit cleanly after the current state completes.
245 """
246 self._shutdown_requested = True
248 def run(self) -> ExecutionResult:
249 """Execute the FSM until terminal state or limits reached.
251 Returns:
252 ExecutionResult with final state and execution metadata
253 """
254 self.started_at = _iso_now()
255 self.start_time_ms = _now_ms()
257 self._emit("loop_start", {"loop": self.fsm.name})
259 try:
260 while True:
261 # Check shutdown request (signal handling)
262 if self._shutdown_requested:
263 return self._finish("signal")
265 # Check iteration limit
266 if self.iteration >= self.fsm.max_iterations:
267 return self._finish("max_iterations")
269 # Check timeout
270 if self.fsm.timeout:
271 elapsed = _now_ms() - self.start_time_ms + self.elapsed_offset_ms
272 if elapsed > self.fsm.timeout * 1000:
273 # BUG-1226: if timeout fires in the race window between
274 # a `route` event and `state_enter`, flush one pending
275 # shell-action state before honoring the timeout so its
276 # side effect (e.g. copying a handshake flag) is not
277 # silently lost. Bounded to shell actions — slash
278 # commands and sub-loops would violate the timeout
279 # budget. Single-step: no cascade.
280 if self._just_routed:
281 pending = self.fsm.states.get(self.current_state)
282 if (
283 pending is not None
284 and not pending.terminal
285 and pending.loop is None
286 and pending.action is not None
287 and self._action_mode(pending) == "shell"
288 ):
289 self._flush_pending_shell_state(pending)
290 return self._finish("timeout")
292 # Get current state config
293 state_config = self.fsm.states[self.current_state]
295 # Update per-state retry tracking based on transition from previous iteration.
296 # If re-entering the same state consecutively, increment retry count.
297 # If entering a different state, clear the previous state's retry count.
298 if self._prev_state is not None:
299 if self.current_state == self._prev_state:
300 self._retry_counts[self.current_state] = (
301 self._retry_counts.get(self.current_state, 0) + 1
302 )
303 else:
304 self._retry_counts.pop(self._prev_state, None)
305 self._throttle_counts.pop(self._prev_state, None)
307 # Check terminal
308 if state_config.terminal:
309 # Handle maintain mode - restart loop instead of terminating
310 if self.fsm.maintain:
311 self.iteration += 1
312 maintain_target = state_config.on_maintain or self.fsm.initial
313 self._emit(
314 "route",
315 {
316 "from": self.current_state,
317 "to": maintain_target,
318 "reason": "maintain",
319 },
320 )
321 self._prev_state = self.current_state
322 self.current_state = maintain_target
323 self._just_routed = True
324 continue
325 return self._finish("terminal")
327 # Check per-state retry limit. If the consecutive re-entry count exceeds
328 # max_retries, skip execution and route to on_retry_exhausted instead.
329 if state_config.max_retries is not None:
330 retry_count = self._retry_counts.get(self.current_state, 0)
331 if retry_count > state_config.max_retries:
332 # on_retry_exhausted is guaranteed non-None by validation when
333 # max_retries is set, but we fall back to an error if misconfigured.
334 exhausted_state: str = state_config.on_retry_exhausted or ""
335 if not exhausted_state:
336 return self._finish(
337 "error",
338 error=f"State '{self.current_state}' exceeded max_retries "
339 "but on_retry_exhausted is not set",
340 )
341 self._emit(
342 "retry_exhausted",
343 {
344 "state": self.current_state,
345 "retries": retry_count,
346 "next": exhausted_state,
347 },
348 )
349 self._retry_counts.pop(self.current_state, None)
350 self._prev_state = self.current_state
351 self.current_state = exhausted_state
352 continue
354 self.iteration += 1
355 self._just_routed = False
356 self._emit(
357 "state_enter",
358 {
359 "state": self.current_state,
360 "iteration": self.iteration,
361 },
362 )
364 # Execute state
365 next_state = self._execute_state(state_config)
367 # Check for pending error signal (FATAL_ERROR)
368 if self._pending_error is not None:
369 return self._finish("error", error=self._pending_error)
371 # Check for pending handoff signal
372 if self._pending_handoff:
373 return self._handle_handoff(self._pending_handoff)
375 # Handle maintain mode
376 if next_state is None and self.fsm.maintain:
377 next_state = state_config.on_maintain or self.fsm.initial
379 # SIGKILL in _execute_state sets shutdown flag and returns None
380 if next_state is None and self._shutdown_requested:
381 return self._finish("signal")
383 if next_state is None:
384 return self._finish("error", error="No valid transition")
386 # At this point next_state is guaranteed to be str
387 resolved_next: str = next_state
389 self._emit(
390 "route",
391 {
392 "from": self.current_state,
393 "to": resolved_next,
394 },
395 )
397 # Per-edge revisit tracking for cycle detection.
398 edge_key = f"{self.current_state}->{resolved_next}"
399 self._edge_revisit_counts[edge_key] = self._edge_revisit_counts.get(edge_key, 0) + 1
400 if self._edge_revisit_counts[edge_key] > self.fsm.max_edge_revisits:
401 self._emit(
402 "cycle_detected",
403 {
404 "edge": edge_key,
405 "from": self.current_state,
406 "to": resolved_next,
407 "count": self._edge_revisit_counts[edge_key],
408 "max": self.fsm.max_edge_revisits,
409 },
410 )
411 return self._finish(
412 "cycle_detected",
413 error=f"Cycle detected: edge {edge_key} traversed "
414 f"{self._edge_revisit_counts[edge_key]} times "
415 f"(limit: {self.fsm.max_edge_revisits})",
416 )
418 self._prev_state = self.current_state
419 self.current_state = resolved_next
420 self._just_routed = True
422 # Interruptible backoff sleep between iterations
423 if self.fsm.backoff and self.fsm.backoff > 0:
424 deadline = time.time() + self.fsm.backoff
425 while time.time() < deadline:
426 if self._shutdown_requested:
427 break
428 time.sleep(min(0.1, deadline - time.time()))
430 except InterpolationError as exc:
431 return self._finish(
432 "error",
433 error=(
434 f"Missing context variable in state '{self.current_state}': {exc}. "
435 f"Run with: ll-loop run {self.fsm.name} --context KEY=VALUE"
436 ),
437 )
438 except Exception as exc:
439 return self._finish("error", error=str(exc))
441 def _execute_sub_loop(self, state: StateConfig, ctx: InterpolationContext) -> str | None:
442 """Execute a sub-loop state by loading and running a child FSM.
444 Args:
445 state: The state configuration with loop field set
446 ctx: Interpolation context for routing
448 Returns:
449 Next state name based on child loop verdict, or None
450 """
451 from little_loops.cli.loop._helpers import resolve_loop_path
452 from little_loops.fsm.validation import load_and_validate
454 assert state.loop is not None # guarded by caller
455 loop_name = interpolate(state.loop, ctx)
456 loop_path = resolve_loop_path(loop_name, self.loops_dir or Path(".loops"))
457 child_fsm, _ = load_and_validate(loop_path)
459 # Bind child context: explicit with: bindings take precedence over legacy passthrough
460 if state.with_:
461 from little_loops.fsm.interpolation import interpolate_dict
463 resolved = interpolate_dict(state.with_, ctx)
464 # Apply declared defaults for unbound optional parameters
465 for param_name, param_spec in child_fsm.parameters.items():
466 if (
467 param_name not in resolved
468 and not param_spec.required
469 and param_spec.default is not None
470 ):
471 resolved[param_name] = param_spec.default
472 # Runtime check: required parameters must be present after interpolation
473 for param_name, param_spec in child_fsm.parameters.items():
474 if param_spec.required and param_name not in resolved:
475 raise ValueError(
476 f"Sub-loop '{state.loop}' requires parameter '{param_name}' "
477 f"but it is not bound in 'with'"
478 )
479 # Merge: child's own context block provides base; with: bindings override
480 child_fsm.context = {**child_fsm.context, **resolved}
481 elif state.context_passthrough:
482 # Extract .output strings from capture result dicts so ${context.key} resolves
483 # to the plain output string (e.g. "ENH-123") rather than the full capture object.
484 captured_as_context = {
485 k: v["output"] if isinstance(v, dict) and "exit_code" in v else v
486 for k, v in self.captured.items()
487 }
488 child_fsm.context = {**self.fsm.context, **captured_as_context, **child_fsm.context}
490 depth = self._depth + 1
491 child_events: list[dict] = []
493 def _sub_event_callback(event: dict) -> None:
494 child_events.append(event)
495 # Only inject depth if not already set by a deeper nested sub-loop
496 if "depth" not in event:
497 self.event_callback({**event, "depth": depth})
498 else:
499 self.event_callback(event)
501 child_executor = FSMExecutor(
502 child_fsm,
503 action_runner=self.action_runner,
504 loops_dir=self.loops_dir,
505 event_callback=_sub_event_callback,
506 circuit=self._circuit,
507 )
508 child_executor._depth = depth # propagate depth for further nesting
510 # Clamp child timeout to parent's remaining wall-clock budget so a slow sub-loop
511 # can't silently consume the parent's deadline with no recourse for the parent FSM.
512 if self.fsm.timeout:
513 elapsed_ms = _now_ms() - self.start_time_ms + self.elapsed_offset_ms
514 remaining_s = max(1, int((self.fsm.timeout * 1000 - elapsed_ms) // 1000))
515 if child_fsm.timeout is None or child_fsm.timeout > remaining_s:
516 child_fsm.timeout = remaining_s
518 child_result = child_executor.run()
520 # Capture child event stream as a JSON-lines string if the state declares a capture key
521 if state.capture and child_events:
522 import json as _json
524 self.captured[state.capture] = {
525 "output": "\n".join(_json.dumps(e) for e in child_events),
526 "exit_code": None,
527 }
529 # Merge child captures back into parent under the state name
530 if (state.context_passthrough or state.with_) and child_executor.captured:
531 self.captured[self.current_state] = child_executor.captured
533 # Route based on child termination reason and terminal state name
534 if child_result.terminated_by == "terminal":
535 if child_result.final_state == "done":
536 return interpolate(state.on_yes, ctx) if state.on_yes else None
537 else:
538 # Reached a non-done terminal (e.g. "failed") → failure
539 return interpolate(state.on_no, ctx) if state.on_no else None
540 elif child_result.terminated_by == "error":
541 # Runtime child failure (not a YAML load error)
542 if state.on_error:
543 return interpolate(state.on_error, ctx)
544 return interpolate(state.on_no, ctx) if state.on_no else None
545 else:
546 # max_iterations, timeout, signal — all are failure
547 return interpolate(state.on_no, ctx) if state.on_no else None
549 def _execute_learning_state(self, state: StateConfig, ctx: InterpolationContext) -> str | None:
550 """Execute a FEAT-1283 ``type: learning`` state.
552 Iterates ``state.learning.targets`` in order. For each target:
553 1. Look up its record in the learning-tests registry (ENH-1282).
554 2. If proven → continue.
555 3. If refuted → emit ``learning_target_refuted`` + ``learning_blocked``
556 and route to ``on_blocked`` (preferred) or ``on_no``.
557 4. If missing or stale → emit ``learning_target_stale`` and invoke
558 ``/ll:explore-api <target>`` via the executor's action_runner;
559 re-check the registry; repeat up to ``max_retries`` times before
560 emitting ``learning_blocked`` (reason ``retries_exhausted``) and
561 routing to ``on_blocked``/``on_no``.
563 When every target ends up proven, emit ``learning_complete`` and route
564 to ``on_yes``. Returns the resolved next-state name (or ``None`` when no
565 route is configured for the terminal verdict, mirroring ``_route``).
566 """
567 from little_loops.learning_tests import check_learning_test
569 assert state.learning is not None # guarded by caller
571 def _blocked_target(reason: str, target: str) -> str | None:
572 self._emit(
573 "learning_blocked",
574 {"state": self.current_state, "target": target, "reason": reason},
575 )
576 route = state.on_blocked or state.on_no
577 return interpolate(route, ctx) if route else None
579 for target in state.learning.targets:
580 record = check_learning_test(target)
582 attempts = 0
583 while record is None or record.status == "stale":
584 if attempts >= state.learning.max_retries:
585 return _blocked_target("retries_exhausted", target)
587 if record is None:
588 self._emit(
589 "learning_target_stale",
590 {"state": self.current_state, "target": target, "cause": "missing"},
591 )
592 else:
593 self._emit(
594 "learning_target_stale",
595 {"state": self.current_state, "target": target, "cause": "stale"},
596 )
598 self._emit(
599 "learning_explore_invoked",
600 {"state": self.current_state, "target": target, "attempt": attempts + 1},
601 )
602 self._run_action(f"/ll:explore-api {target}", state, ctx)
603 attempts += 1
604 record = check_learning_test(target)
606 if record.status == "refuted":
607 self._emit(
608 "learning_target_refuted",
609 {"state": self.current_state, "target": target},
610 )
611 return _blocked_target("refuted", target)
613 self._emit(
614 "learning_target_proven",
615 {"state": self.current_state, "target": target},
616 )
618 self._emit(
619 "learning_complete",
620 {"state": self.current_state, "targets": list(state.learning.targets)},
621 )
622 return interpolate(state.on_yes, ctx) if state.on_yes else None
624 def _check_throttle(self, state: StateConfig, state_name: str) -> str | None:
625 """Increment the per-state tool-call counter and enforce throttle thresholds.
627 Called after every action execution within a state visit. Returns the forced
628 next-state name when the hard threshold is reached, or None when execution
629 should continue normally (warn events are emitted but do not redirect).
631 Sets self._pending_error and returns "__STOP__" when the call count exceeds
632 hard_max with no on_throttle_hard target — the caller must propagate this as
633 a None return from _execute_state so the main loop detects _pending_error.
634 """
635 count = self._throttle_counts.get(state_name, 0) + 1
636 self._throttle_counts[state_name] = count
638 throttle = state.throttle
639 normal_max = (
640 throttle.normal_max
641 if (throttle and throttle.normal_max is not None)
642 else _DEFAULT_THROTTLE_NORMAL_MAX
643 )
644 warn_max = (
645 throttle.warn_max
646 if (throttle and throttle.warn_max is not None)
647 else _DEFAULT_THROTTLE_WARN_MAX
648 )
649 hard_max = (
650 throttle.hard_max
651 if (throttle and throttle.hard_max is not None)
652 else _DEFAULT_THROTTLE_HARD_MAX
653 )
655 if count == warn_max:
656 self._emit(
657 THROTTLE_WARN_EVENT,
658 {
659 "state": state_name,
660 "count": count,
661 "normal_max": normal_max,
662 "warn_max": warn_max,
663 "hard_max": hard_max,
664 },
665 )
667 # States with type="learning" (FEAT-1283) are exempt from hard_max — they
668 # legitimately make N calls per visit (one per unproven target).
669 if state.type == "learning":
670 return None
672 if count == hard_max:
673 next_target = state.on_throttle_hard or state.on_error
674 self._emit(
675 THROTTLE_HARD_EVENT,
676 {"state": state_name, "count": count, "hard_max": hard_max, "next": next_target},
677 )
678 return next_target
680 if count > hard_max:
681 self._emit(
682 THROTTLE_STOP_EVENT,
683 {"state": state_name, "count": count, "hard_max": hard_max},
684 )
685 self._pending_error = (
686 f"Throttle stop: state '{state_name}' exceeded hard_max={hard_max} "
687 "tool calls with no on_throttle_hard target"
688 )
689 return "__STOP__"
691 return None
693 def _execute_state(self, state: StateConfig) -> str | None:
694 """Execute a single state and return next state name.
696 Args:
697 state: The state configuration to execute
699 Returns:
700 Next state name, or None if no valid transition
701 """
702 # Build interpolation context
703 ctx = self._build_context()
705 # Dispatch to sub-loop handler if this is a sub-loop state
706 if state.loop is not None:
707 try:
708 return self._execute_sub_loop(state, ctx)
709 except (FileNotFoundError, ValueError):
710 if state.on_error:
711 return interpolate(state.on_error, ctx)
712 raise
714 # FEAT-1283: dispatch to learning-state handler when both type="learning"
715 # AND a LearningConfig is present. The bare `type="learning"` marker
716 # (used pre-FEAT-1283 only as a throttle hard_max exemption hint, see
717 # ThrottleConfig docstring) falls through to normal action execution.
718 if state.type == "learning" and state.learning is not None:
719 return self._execute_learning_state(state, ctx)
721 # Handle unconditional transition
722 if state.next:
723 if state.action:
724 self._maybe_wait_for_circuit(state)
725 result, routed = self._run_action_or_route(state, ctx)
726 if routed is not None:
727 return routed
728 throttle_next = self._check_throttle(state, self.current_state)
729 if throttle_next == "__STOP__":
730 return None
731 if throttle_next is not None:
732 return throttle_next
733 assert result is not None
734 self.prev_result = {
735 "output": result.output,
736 "exit_code": result.exit_code,
737 "state": self.current_state,
738 }
739 if result.exit_code is not None and result.exit_code < 0:
740 # Process killed by signal — do not silently advance via next
741 if state.on_error:
742 return interpolate(state.on_error, ctx)
743 self.request_shutdown()
744 return None
745 # Non-zero exit: if on_error is defined, treat next as success path only
746 if result.exit_code != 0 and state.on_error:
747 return interpolate(state.on_error, ctx)
748 return interpolate(state.next, ctx)
750 # Execute action if present
751 action_result = None
752 if state.action:
753 self._maybe_wait_for_circuit(state)
754 action_result, routed = self._run_action_or_route(state, ctx)
755 if routed is not None:
756 return routed
757 throttle_next = self._check_throttle(state, self.current_state)
758 if throttle_next == "__STOP__":
759 return None
760 if throttle_next is not None:
761 return throttle_next
763 # Evaluate
764 eval_result = self._evaluate(state, action_result, ctx)
765 self.prev_result = {
766 "output": action_result.output if action_result else "",
767 "exit_code": action_result.exit_code if action_result else 0,
768 "state": self.current_state,
769 }
771 # Update context with result for routing interpolation
772 if eval_result:
773 ctx.result = {
774 "verdict": eval_result.verdict,
775 "details": eval_result.details,
776 }
778 # Route based on verdict
779 verdict = eval_result.verdict if eval_result else "yes"
780 route_ctx = RouteContext(
781 state_name=self.current_state,
782 state=state,
783 verdict=verdict,
784 action_result=action_result,
785 eval_result=eval_result,
786 ctx=ctx,
787 iteration=self.iteration,
788 )
789 # 429 / rate-limit detection — runs before interceptors so an in-place retry
790 # returns early without dispatching to registered before_route hooks.
791 if action_result is not None:
792 _combined = (action_result.output or "") + "\n" + (action_result.stderr or "")
793 _failure_type, _reason = classify_failure(_combined, action_result.exit_code)
794 if _failure_type == FailureType.TRANSIENT and (
795 "rate limit" in _reason.lower() or "quota" in _reason.lower()
796 ):
797 _handled, _target = self._handle_rate_limit(state, route_ctx.state_name)
798 if _handled:
799 return _target
800 elif _failure_type == FailureType.TRANSIENT and "api server error" in _reason.lower():
801 _handled, _target = self._handle_api_error(state, route_ctx.state_name)
802 if _handled:
803 return _target
804 # exhausted — fall through to normal verdict routing
805 else:
806 # Not rate-limited or server-error: reset counters so future transients start fresh.
807 self._rate_limit_retries.pop(route_ctx.state_name, None)
808 self._consecutive_rate_limit_exhaustions = 0
809 self._api_error_retries.pop(route_ctx.state_name, None)
811 for interceptor in self._interceptors:
812 if hasattr(interceptor, "before_route"):
813 decision = interceptor.before_route(route_ctx)
814 if isinstance(decision, RouteDecision):
815 if decision.next_state is None:
816 return None # veto
817 return decision.next_state # redirect — bypass _route()
818 next_state = self._route(state, verdict, ctx)
819 for interceptor in self._interceptors:
820 if hasattr(interceptor, "after_route"):
821 interceptor.after_route(route_ctx)
822 return next_state
824 def _run_action(
825 self,
826 action_template: str,
827 state: StateConfig,
828 ctx: InterpolationContext,
829 ) -> ActionResult:
830 """Execute action and optionally capture result.
832 Args:
833 action_template: Action string (may contain variables)
834 state: State configuration
835 ctx: Interpolation context
837 Returns:
838 ActionResult with output and exit code
839 """
840 action = interpolate(action_template, ctx)
841 action_mode = self._action_mode(state)
843 self._emit("action_start", {"action": action, "is_prompt": action_mode == "prompt"})
845 def _on_line(line: str) -> None:
846 self._emit("action_output", {"line": line})
848 if action_mode == "mcp_tool":
849 # Direct MCP tool call — bypass action_runner entirely
850 interpolated_params = interpolate_dict(state.params, ctx) if state.params else {}
851 cmd = ["mcp-call", action, json.dumps(interpolated_params)]
852 result = self._run_subprocess(
853 cmd,
854 timeout=state.timeout or self.fsm.default_timeout or 30,
855 on_output_line=_on_line,
856 )
857 elif action_mode == "contributed":
858 assert (
859 state.action_type is not None
860 ) # guaranteed by _action_mode returning "contributed"
861 runner = self._contributed_actions[state.action_type]
862 result = runner.run(
863 action,
864 timeout=state.timeout or self.fsm.default_timeout or 3600,
865 is_slash_command=False,
866 on_output_line=_on_line,
867 )
868 else:
869 result = self.action_runner.run(
870 action,
871 timeout=state.timeout or self.fsm.default_timeout or 3600,
872 is_slash_command=action_mode == "prompt",
873 on_output_line=_on_line,
874 agent=state.agent if action_mode == "prompt" else None,
875 tools=state.tools if action_mode == "prompt" else None,
876 )
878 preview = result.output[-2000:].strip() if result.output else None
879 payload: dict[str, Any] = {
880 "exit_code": result.exit_code,
881 "duration_ms": result.duration_ms,
882 "output_preview": preview,
883 "is_prompt": action_mode == "prompt",
884 }
885 if action_mode == "prompt":
886 session_jsonl = get_current_session_jsonl()
887 payload["session_jsonl"] = str(session_jsonl) if session_jsonl else None
888 self._emit("action_complete", payload)
890 # Capture if requested
891 if state.capture:
892 self.captured[state.capture] = {
893 "output": result.output.rstrip("\n\r"),
894 "stderr": result.stderr,
895 "exit_code": result.exit_code,
896 "duration_ms": result.duration_ms,
897 }
899 # Check for signals in output
900 if self.signal_detector:
901 signal = self.signal_detector.detect_first(result.output)
902 if signal:
903 if signal.signal_type == "handoff":
904 self._pending_handoff = signal
905 elif signal.signal_type == "error":
906 self._pending_error = signal.payload
907 elif signal.signal_type == "stop":
908 self.request_shutdown()
910 return result
912 def _run_subprocess(
913 self,
914 cmd: list[str],
915 timeout: int,
916 on_output_line: Any | None = None,
917 ) -> ActionResult:
918 """Run a subprocess directly and return ActionResult.
920 Follows the same Popen + stderr-drain-thread pattern as DefaultActionRunner.
922 Args:
923 cmd: Command and arguments to execute
924 timeout: Timeout in seconds
925 on_output_line: Optional callback for each stdout line
927 Returns:
928 ActionResult with output, stderr, exit_code, duration_ms
929 """
930 start = _now_ms()
931 process = subprocess.Popen(
932 cmd,
933 stdout=subprocess.PIPE,
934 stderr=subprocess.PIPE,
935 text=True,
936 )
937 self._current_process = process
938 output_chunks: list[str] = []
939 stderr_chunks: list[str] = []
941 def _drain_stderr() -> None:
942 assert process.stderr is not None
943 for line in process.stderr:
944 stderr_chunks.append(line)
946 stderr_thread = threading.Thread(target=_drain_stderr, daemon=True)
947 stderr_thread.start()
949 try:
950 for line in process.stdout: # type: ignore[union-attr]
951 output_chunks.append(line)
952 if on_output_line:
953 on_output_line(line.rstrip())
954 process.wait(timeout=timeout)
955 except subprocess.TimeoutExpired:
956 process.kill()
957 process.wait()
958 stderr_thread.join(timeout=5)
959 return ActionResult(
960 output="".join(output_chunks),
961 stderr="".join(stderr_chunks) or "MCP call timed out",
962 exit_code=124,
963 duration_ms=timeout * 1000,
964 )
965 finally:
966 self._current_process = None
967 stderr_thread.join(timeout=5)
968 return ActionResult(
969 output="".join(output_chunks),
970 stderr="".join(stderr_chunks),
971 exit_code=process.returncode,
972 duration_ms=_now_ms() - start,
973 )
975 def _evaluate(
976 self,
977 state: StateConfig,
978 action_result: ActionResult | None,
979 ctx: InterpolationContext,
980 ) -> EvaluationResult | None:
981 """Evaluate action result.
983 Args:
984 state: State configuration
985 action_result: Result from action execution (may be None)
986 ctx: Interpolation context
988 Returns:
989 EvaluationResult, or None if no evaluation needed
990 """
991 if state.evaluate is None:
992 # Default evaluation based on action type
993 if action_result:
994 action_mode = self._action_mode(state)
996 if action_mode == "mcp_tool":
997 # MCP tool call: use mcp_result evaluator
998 result = evaluate_mcp_result(action_result.output, action_result.exit_code)
999 elif action_mode == "prompt":
1000 # Slash command or prompt: use LLM evaluation
1001 if not self.fsm.llm.enabled:
1002 result = EvaluationResult(
1003 verdict="error",
1004 details={"error": "LLM evaluation disabled via --no-llm"},
1005 )
1006 else:
1007 result = evaluate_llm_structured(
1008 action_result.output,
1009 model=self.fsm.llm.model,
1010 max_tokens=self.fsm.llm.max_tokens,
1011 timeout=self.fsm.llm.timeout,
1012 )
1013 else:
1014 # Shell command: use exit code
1015 result = evaluate_exit_code(action_result.exit_code)
1017 self._emit(
1018 "evaluate",
1019 {
1020 "type": "default",
1021 "verdict": result.verdict,
1022 **result.details,
1023 },
1024 )
1025 return result
1026 return None
1028 # Explicit evaluation config
1029 raw_output = action_result.output if action_result else ""
1030 if state.evaluate.source:
1031 try:
1032 eval_input = interpolate(state.evaluate.source, ctx)
1033 except InterpolationError:
1034 eval_input = raw_output
1035 else:
1036 eval_input = raw_output
1038 if state.evaluate.type in self._contributed_evaluators:
1039 result = self._contributed_evaluators[state.evaluate.type](
1040 state.evaluate,
1041 eval_input,
1042 action_result.exit_code if action_result else 0,
1043 ctx,
1044 )
1045 elif state.evaluate.type == "llm_structured" and not self.fsm.llm.enabled:
1046 result = EvaluationResult(
1047 verdict="error",
1048 details={"error": "LLM evaluation disabled via --no-llm"},
1049 )
1050 else:
1051 result = evaluate(
1052 config=state.evaluate,
1053 output=eval_input,
1054 exit_code=action_result.exit_code if action_result else 0,
1055 context=ctx,
1056 )
1058 self._emit(
1059 "evaluate",
1060 {
1061 "type": state.evaluate.type,
1062 "verdict": result.verdict,
1063 **result.details,
1064 },
1065 )
1067 return result
1069 def _route(
1070 self,
1071 state: StateConfig,
1072 verdict: str,
1073 ctx: InterpolationContext,
1074 ) -> str | None:
1075 """Determine next state from verdict.
1077 Resolution order (from design doc):
1078 1. next (unconditional) - handled before this method
1079 2. route (full routing table)
1080 3. on_success/on_failure/on_error (shorthand)
1081 4. terminal - handled in main loop
1082 5. error
1084 Args:
1085 state: State configuration
1086 verdict: Verdict string from evaluation
1087 ctx: Interpolation context
1089 Returns:
1090 Next state name, or None if no valid route
1091 """
1092 if state.route:
1093 routes = state.route.routes
1094 if verdict in routes:
1095 return self._resolve_route(routes[verdict], ctx)
1096 if state.route.default:
1097 return self._resolve_route(state.route.default, ctx)
1098 if verdict == "error" and state.route.error:
1099 return self._resolve_route(state.route.error, ctx)
1100 return None
1102 # Shorthand routing
1103 if verdict == "yes" and state.on_yes:
1104 return self._resolve_route(state.on_yes, ctx)
1105 if verdict == "no" and state.on_no:
1106 return self._resolve_route(state.on_no, ctx)
1107 if verdict == "error" and state.on_error:
1108 return self._resolve_route(state.on_error, ctx)
1109 if verdict == "partial" and state.on_partial:
1110 return self._resolve_route(state.on_partial, ctx)
1111 if verdict == "blocked" and state.on_blocked:
1112 return self._resolve_route(state.on_blocked, ctx)
1114 # Dynamic on_<verdict> shorthands from extra_routes
1115 if verdict in state.extra_routes:
1116 return self._resolve_route(state.extra_routes[verdict], ctx)
1118 return None
1120 def _resolve_route(self, route: str, ctx: InterpolationContext) -> str:
1121 """Resolve route target, handling special tokens.
1123 Args:
1124 route: Route target string
1125 ctx: Interpolation context
1127 Returns:
1128 Resolved state name
1129 """
1130 if route == "$current":
1131 return self.current_state
1132 return interpolate(route, ctx)
1134 def _flush_pending_shell_state(self, state: StateConfig) -> None:
1135 """Execute a pending shell-action state's action before honoring a
1136 wall-clock timeout. BUG-1226: closes the narrow race between emitting
1137 a `route` event and `state_enter` so handshake states (e.g. autodev's
1138 ``copy_broke_down``) do not silently drop their side effect when the
1139 timeout fires in that window. Single-step: we run the action but do
1140 not follow its routing — ``final_state`` stays as the flushed state.
1141 """
1142 assert state.action is not None # guarded by caller
1143 self.iteration += 1
1144 self._just_routed = False
1145 self._emit(
1146 "state_enter",
1147 {
1148 "state": self.current_state,
1149 "iteration": self.iteration,
1150 "flushed": True,
1151 },
1152 )
1153 ctx = self._build_context()
1154 try:
1155 self._run_action(state.action, state, ctx)
1156 except Exception:
1157 # Deliberately swallow — the timeout is being honored regardless
1158 # of whether the flushed action succeeded.
1159 pass
1161 def _action_mode(self, state: StateConfig) -> str:
1162 """Return execution mode for the state: 'prompt', 'shell', or 'mcp_tool'."""
1163 if state.action_type == "mcp_tool":
1164 return "mcp_tool"
1165 if state.action_type in ("prompt", "slash_command"):
1166 return "prompt"
1167 if state.action_type == "shell":
1168 return "shell"
1169 if state.action_type in self._contributed_actions:
1170 return "contributed"
1171 # Heuristic: / prefix = slash_command (prompt mode)
1172 if state.action is not None and state.action.startswith("/"):
1173 return "prompt"
1174 return "shell"
1176 def _run_action_or_route(
1177 self, state: StateConfig, ctx: InterpolationContext
1178 ) -> tuple[ActionResult | None, str | None]:
1179 """Run the state action, routing unhandled exceptions to on_error.
1181 Returns (action_result, routed_target). ``routed_target`` is a non-None
1182 next-state string only when an exception was raised AND ``state.on_error``
1183 is defined; in that case ``action_result`` is None. When no on_error is
1184 set, the exception is re-raised for the top-level ``run()`` handler.
1185 """
1186 assert state.action is not None # caller-guarded
1187 try:
1188 return self._run_action(state.action, state, ctx), None
1189 except Exception as exc:
1190 if state.on_error:
1191 self._emit(
1192 "action_error",
1193 {
1194 "state": self.current_state,
1195 "error": str(exc),
1196 "route": "on_error",
1197 },
1198 )
1199 return None, interpolate(state.on_error, ctx)
1200 raise
1202 def _build_context(self) -> InterpolationContext:
1203 """Build interpolation context for current state.
1205 Returns:
1206 InterpolationContext with all runtime values
1207 """
1208 return InterpolationContext(
1209 context=self.fsm.context,
1210 captured=self.captured,
1211 prev=self.prev_result,
1212 result=None,
1213 state_name=self.current_state,
1214 iteration=self.iteration,
1215 loop_name=self.fsm.name,
1216 started_at=self.started_at,
1217 elapsed_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms,
1218 )
1220 def _emit(self, event: str, data: dict[str, Any]) -> None:
1221 """Emit an event via the callback."""
1222 self.event_callback(
1223 {
1224 "event": event,
1225 "ts": _iso_now(),
1226 **data,
1227 }
1228 )
1230 def _handle_rate_limit(self, state: StateConfig, state_name: str) -> tuple[bool, str | None]:
1231 """Handle a detected 429/rate-limit action outcome.
1233 Implements the two-tier retry ladder:
1234 1. Short-burst tier: up to ``max_rate_limit_retries`` attempts with
1235 exponential backoff (``rate_limit_backoff_base_seconds * 2^n + jitter``).
1236 2. Long-wait tier: walks ``rate_limit_long_wait_ladder`` with index capped
1237 at the last entry, accumulating ``total_wait_seconds``.
1239 Routes to ``on_rate_limit_exhausted`` (falling back to ``on_error``) only
1240 once ``total_wait_seconds >= rate_limit_max_wait_seconds``. Emits
1241 ``rate_limit_exhausted`` on routing (including tier counters) and
1242 ``rate_limit_storm`` when consecutive exhaustions reach the threshold.
1244 Returns:
1245 (handled, target). ``handled=True`` means the caller should return
1246 ``target`` directly (in-place retry uses ``state_name``; exhaustion
1247 uses the routed target). ``handled=False`` should not occur for the
1248 current rate-limit classification path but is reserved for future
1249 extensions.
1250 """
1251 _short_max = (
1252 state.max_rate_limit_retries
1253 if state.max_rate_limit_retries is not None
1254 else _DEFAULT_RATE_LIMIT_RETRIES
1255 )
1256 _backoff_base = (
1257 state.rate_limit_backoff_base_seconds
1258 if state.rate_limit_backoff_base_seconds is not None
1259 else _DEFAULT_RATE_LIMIT_BACKOFF_BASE
1260 )
1261 _max_wait = (
1262 state.rate_limit_max_wait_seconds
1263 if state.rate_limit_max_wait_seconds is not None
1264 else _DEFAULT_RATE_LIMIT_MAX_WAIT_SECONDS
1265 )
1266 _ladder = (
1267 state.rate_limit_long_wait_ladder
1268 if state.rate_limit_long_wait_ladder is not None
1269 else _DEFAULT_RATE_LIMIT_LONG_WAIT_LADDER
1270 )
1272 record = self._rate_limit_retries.get(state_name)
1273 if record is None:
1274 record = {
1275 "short_retries": 0,
1276 "long_retries": 0,
1277 "total_wait_seconds": 0.0,
1278 "first_seen_at": time.time(),
1279 }
1280 self._rate_limit_retries[state_name] = record
1282 short_retries = int(record.get("short_retries", 0))
1283 long_retries = int(record.get("long_retries", 0))
1284 total_wait = float(record.get("total_wait_seconds", 0.0))
1286 if short_retries < _short_max:
1287 # Short-burst tier — exponential backoff with jitter. Budget is not
1288 # checked here; short-tier always advances to long-wait on exhaustion.
1289 short_retries += 1
1290 record["short_retries"] = short_retries
1291 _sleep = _backoff_base * (2 ** (short_retries - 1)) + random.uniform(0, _backoff_base)
1292 if self._circuit is not None:
1293 self._circuit.record_rate_limit(_sleep)
1294 total_wait += self._interruptible_sleep(_sleep)
1295 record["total_wait_seconds"] = total_wait
1296 return True, state_name # retry in place
1298 # Long-wait tier — walk ladder with capped index.
1299 long_retries += 1
1300 record["long_retries"] = long_retries
1301 _idx = min(long_retries - 1, len(_ladder) - 1)
1302 _wait = float(_ladder[_idx])
1303 if self._circuit is not None:
1304 self._circuit.record_rate_limit(_wait)
1305 _tier_start = time.time()
1306 _deadline = _tier_start + _wait
1307 _total_wait_before_tier = total_wait
1308 total_wait += self._interruptible_sleep(
1309 _wait,
1310 on_heartbeat=lambda elapsed: self._emit(
1311 RATE_LIMIT_WAITING_EVENT,
1312 {
1313 "state": state_name,
1314 "elapsed_seconds": elapsed,
1315 "next_attempt_at": _deadline,
1316 "total_waited_seconds": _total_wait_before_tier + elapsed,
1317 "budget_seconds": _max_wait,
1318 "tier": "long_wait",
1319 },
1320 ),
1321 )
1322 record["total_wait_seconds"] = total_wait
1323 if total_wait >= _max_wait:
1324 return True, self._exhaust_rate_limit(state, state_name, record)
1325 return True, state_name # retry in place
1327 def _maybe_wait_for_circuit(self, state: StateConfig) -> None:
1328 """Pre-action circuit-breaker check: sleep until shared 429 recovery.
1330 Skips quietly when no circuit is injected, when the state's action is not
1331 an LLM-quota consumer, or when the circuit has no active recovery window.
1332 """
1333 if self._circuit is None:
1334 return
1335 if self._action_mode(state) != "prompt":
1336 return
1337 recovery = self._circuit.get_estimated_recovery()
1338 if recovery is None:
1339 return
1340 wait = recovery - time.time()
1341 if wait > 0:
1342 self._interruptible_sleep(wait)
1344 def _interruptible_sleep(
1345 self,
1346 duration: float,
1347 on_heartbeat: Callable[[float], None] | None = None,
1348 ) -> float:
1349 """Sleep for up to ``duration`` seconds in 100ms ticks, exiting promptly
1350 on ``_shutdown_requested``. Returns the actual elapsed seconds so callers
1351 can accumulate wall-clock time spent in rate-limit waits.
1353 If ``on_heartbeat`` is provided, it is invoked with the elapsed seconds
1354 roughly every ``_RATE_LIMIT_HEARTBEAT_INTERVAL`` seconds so UIs can show
1355 live progress during long waits. The short-tier call site intentionally
1356 omits the callback to preserve backward-compatible silent behavior.
1357 """
1358 if duration <= 0:
1359 return 0.0
1360 _start = time.time()
1361 _deadline = _start + duration
1362 last_heartbeat = _start
1363 while time.time() < _deadline:
1364 if self._shutdown_requested:
1365 break
1366 time.sleep(min(0.1, _deadline - time.time()))
1367 if on_heartbeat is not None:
1368 _now = time.time()
1369 if _now - last_heartbeat >= _RATE_LIMIT_HEARTBEAT_INTERVAL:
1370 on_heartbeat(_now - _start)
1371 last_heartbeat = _now
1372 return time.time() - _start
1374 def _exhaust_rate_limit(
1375 self, state: StateConfig, state_name: str, record: dict[str, Any]
1376 ) -> str | None:
1377 """Finalize rate-limit exhaustion: emit event, storm detection, and
1378 return the routed target. Pops the per-state record and is called only
1379 once the wall-clock budget is spent.
1380 """
1381 self._rate_limit_retries.pop(state_name, None)
1382 target = state.on_rate_limit_exhausted or state.on_error
1383 self._emit(
1384 RATE_LIMIT_EXHAUSTED_EVENT,
1385 {
1386 "state": state_name,
1387 "retries": int(record.get("short_retries", 0)) + int(record.get("long_retries", 0)),
1388 "short_retries": int(record.get("short_retries", 0)),
1389 "long_retries": int(record.get("long_retries", 0)),
1390 "total_wait_seconds": float(record.get("total_wait_seconds", 0.0)),
1391 "next": target,
1392 },
1393 )
1394 self._consecutive_rate_limit_exhaustions += 1
1395 if self._consecutive_rate_limit_exhaustions >= _RATE_LIMIT_STORM_THRESHOLD:
1396 self._emit(
1397 RATE_LIMIT_STORM_EVENT,
1398 {
1399 "state": state_name,
1400 "count": self._consecutive_rate_limit_exhaustions,
1401 },
1402 )
1403 return target
1405 def _handle_api_error(self, state: StateConfig, state_name: str) -> tuple[bool, str | None]:
1406 """Handle a detected API server error with short-burst flat backoff.
1408 Unlike ``_handle_rate_limit``, uses a flat backoff with no long-wait tier and
1409 falls through to normal FSM routing after ``_DEFAULT_API_ERROR_RETRIES`` attempts
1410 so transient infrastructure hiccups don't permanently misdirect the loop.
1412 Returns:
1413 ``(True, state_name)`` to retry the state in place, or
1414 ``(False, None)`` when the retry budget is exhausted (caller falls
1415 through to normal verdict routing).
1416 """
1417 record = self._api_error_retries.setdefault(state_name, {"retries": 0, "total_wait": 0.0})
1418 if record["retries"] >= _DEFAULT_API_ERROR_RETRIES:
1419 self._api_error_retries.pop(state_name, None)
1420 self._emit("api_error_exhausted", {"state": state_name, "retries": record["retries"]})
1421 return False, None
1422 record["retries"] += 1
1423 slept = self._interruptible_sleep(_DEFAULT_API_ERROR_BACKOFF)
1424 record["total_wait"] += slept
1425 self._emit(
1426 "api_error_retry",
1427 {
1428 "state": state_name,
1429 "attempt": record["retries"],
1430 "backoff": _DEFAULT_API_ERROR_BACKOFF,
1431 },
1432 )
1433 return True, state_name
1435 def _finish(self, terminated_by: str, error: str | None = None) -> ExecutionResult:
1436 """Finalize execution and return result."""
1437 self._emit(
1438 "loop_complete",
1439 {
1440 "final_state": self.current_state,
1441 "iterations": self.iteration,
1442 "terminated_by": terminated_by,
1443 },
1444 )
1446 return ExecutionResult(
1447 final_state=self.current_state,
1448 iterations=self.iteration,
1449 terminated_by=terminated_by,
1450 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms,
1451 captured=self.captured,
1452 error=error,
1453 )
1455 def _handle_handoff(self, signal: DetectedSignal) -> ExecutionResult:
1456 """Handle a detected handoff signal.
1458 Emits a handoff_detected event and optionally invokes the handoff handler.
1460 Args:
1461 signal: The detected handoff signal
1463 Returns:
1464 ExecutionResult with handoff information
1465 """
1466 self._emit(
1467 "handoff_detected",
1468 {
1469 "state": self.current_state,
1470 "iteration": self.iteration,
1471 "continuation": signal.payload,
1472 },
1473 )
1475 # Invoke handler if configured
1476 if self.handoff_handler:
1477 result = self.handoff_handler.handle(self.fsm.name, signal.payload)
1478 if result.spawned_process is not None:
1479 self._emit(
1480 "handoff_spawned",
1481 {
1482 "pid": result.spawned_process.pid,
1483 "state": self.current_state,
1484 },
1485 )
1487 return ExecutionResult(
1488 final_state=self.current_state,
1489 iterations=self.iteration,
1490 terminated_by="handoff",
1491 duration_ms=_now_ms() - self.start_time_ms + self.elapsed_offset_ms,
1492 captured=self.captured,
1493 handoff=True,
1494 continuation_prompt=signal.payload,
1495 )