Coverage for little_loops / fsm / rate_limit_circuit.py: 83%
75 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Shared circuit-breaker state file for cross-worktree 429 coordination.
3Provides :class:`RateLimitCircuit`: a file-backed record of the most recent
4rate-limit event (observed backoff window and attempt count) that parallel
5worktrees can read to avoid hammering an already-rate-limited endpoint.
7The state file is a small JSON document guarded by an ``fcntl.flock``-held
8sidecar lock, written atomically via ``tempfile.mkstemp`` + ``os.replace``.
9Detection of a 429 stays in the executor
10(see :func:`little_loops.issue_lifecycle.classify_failure`); this module only
11owns the backoff-write side.
12"""
14from __future__ import annotations
16import fcntl
17import json
18import logging
19import os
20import tempfile
21import time
22from pathlib import Path
23from typing import Any
25logger = logging.getLogger(__name__)
27STALE_THRESHOLD_SECONDS = 3600.0
30class RateLimitCircuit:
31 """File-backed circuit-breaker for shared 429 backoff coordination.
33 Constructor accepts an absolute path to the state file
34 (e.g. ``.loops/tmp/rate-limit-circuit.json``). The default path
35 source-of-truth is
36 :attr:`little_loops.config.automation.RateLimitsConfig.circuit_breaker_path`;
37 this module does not import config.
38 """
40 def __init__(self, path: Path) -> None:
41 self.path = Path(path)
42 self._lock_path = self.path.with_suffix(self.path.suffix + ".lock")
44 def record_rate_limit(self, backoff_seconds: float) -> None:
45 """Record a rate-limit event with the observed backoff window.
47 Increments ``attempts``, advances ``last_seen`` to now, and extends
48 ``estimated_recovery_at`` monotonically so concurrent observers do
49 not shrink an in-flight backoff window.
50 """
51 self.path.parent.mkdir(parents=True, exist_ok=True)
52 with open(self._lock_path, "w") as lock_fd:
53 fcntl.flock(lock_fd, fcntl.LOCK_EX)
55 now = time.time()
56 proposed_recovery = now + float(backoff_seconds)
57 current = self._read_unlocked()
58 if current is None:
59 record = {
60 "first_seen": now,
61 "last_seen": now,
62 "attempts": 1,
63 "estimated_recovery_at": proposed_recovery,
64 }
65 else:
66 record = {
67 "first_seen": current.get("first_seen", now),
68 "last_seen": now,
69 "attempts": int(current.get("attempts", 0)) + 1,
70 "estimated_recovery_at": max(
71 float(current.get("estimated_recovery_at", 0.0)),
72 proposed_recovery,
73 ),
74 }
75 self._write_atomic(record)
77 def get_estimated_recovery(self) -> float | None:
78 """Return epoch timestamp of estimated recovery, or None if stale/absent."""
79 if self.is_stale():
80 return None
81 current = self._read_unlocked()
82 if current is None:
83 return None
84 recovery = current.get("estimated_recovery_at")
85 return float(recovery) if recovery is not None else None
87 def is_stale(self) -> bool:
88 """True if the stored entry's ``last_seen`` is >1h ago (or file absent)."""
89 current = self._read_unlocked()
90 if current is None:
91 return False
92 last_seen = current.get("last_seen")
93 if last_seen is None:
94 return True
95 return (time.time() - float(last_seen)) > STALE_THRESHOLD_SECONDS
97 def clear(self) -> None:
98 """Remove the state file. No-op if already absent."""
99 try:
100 self.path.unlink()
101 except FileNotFoundError:
102 pass
104 def _read_unlocked(self) -> dict[str, Any] | None:
105 """Read the state file, treating absent/corrupt as None."""
106 if not self.path.exists():
107 return None
108 try:
109 raw = self.path.read_text()
110 except FileNotFoundError:
111 return None
112 if not raw:
113 return None
114 try:
115 data = json.loads(raw)
116 except json.JSONDecodeError:
117 logger.warning("Corrupted circuit file %s; treating as absent", self.path)
118 return None
119 return data if isinstance(data, dict) else None
121 def _write_atomic(self, data: dict[str, Any]) -> None:
122 """Atomically write ``data`` as JSON to ``self.path``."""
123 payload = json.dumps(data, indent=2)
124 tmp_fd, tmp_path = tempfile.mkstemp(dir=self.path.parent, suffix=".tmp")
125 try:
126 with os.fdopen(tmp_fd, "w") as f:
127 f.write(payload)
128 os.replace(tmp_path, self.path)
129 except Exception:
130 try:
131 os.unlink(tmp_path)
132 except FileNotFoundError:
133 pass
134 raise