Coverage for little_loops / fsm / rate_limit_circuit.py: 83%

75 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Shared circuit-breaker state file for cross-worktree 429 coordination. 

2 

3Provides :class:`RateLimitCircuit`: a file-backed record of the most recent 

4rate-limit event (observed backoff window and attempt count) that parallel 

5worktrees can read to avoid hammering an already-rate-limited endpoint. 

6 

7The state file is a small JSON document guarded by an ``fcntl.flock``-held 

8sidecar lock, written atomically via ``tempfile.mkstemp`` + ``os.replace``. 

9Detection of a 429 stays in the executor 

10(see :func:`little_loops.issue_lifecycle.classify_failure`); this module only 

11owns the backoff-write side. 

12""" 

13 

14from __future__ import annotations 

15 

16import fcntl 

17import json 

18import logging 

19import os 

20import tempfile 

21import time 

22from pathlib import Path 

23from typing import Any 

24 

25logger = logging.getLogger(__name__) 

26 

27STALE_THRESHOLD_SECONDS = 3600.0 

28 

29 

30class RateLimitCircuit: 

31 """File-backed circuit-breaker for shared 429 backoff coordination. 

32 

33 Constructor accepts an absolute path to the state file 

34 (e.g. ``.loops/tmp/rate-limit-circuit.json``). The default path 

35 source-of-truth is 

36 :attr:`little_loops.config.automation.RateLimitsConfig.circuit_breaker_path`; 

37 this module does not import config. 

38 """ 

39 

40 def __init__(self, path: Path) -> None: 

41 self.path = Path(path) 

42 self._lock_path = self.path.with_suffix(self.path.suffix + ".lock") 

43 

44 def record_rate_limit(self, backoff_seconds: float) -> None: 

45 """Record a rate-limit event with the observed backoff window. 

46 

47 Increments ``attempts``, advances ``last_seen`` to now, and extends 

48 ``estimated_recovery_at`` monotonically so concurrent observers do 

49 not shrink an in-flight backoff window. 

50 """ 

51 self.path.parent.mkdir(parents=True, exist_ok=True) 

52 with open(self._lock_path, "w") as lock_fd: 

53 fcntl.flock(lock_fd, fcntl.LOCK_EX) 

54 

55 now = time.time() 

56 proposed_recovery = now + float(backoff_seconds) 

57 current = self._read_unlocked() 

58 if current is None: 

59 record = { 

60 "first_seen": now, 

61 "last_seen": now, 

62 "attempts": 1, 

63 "estimated_recovery_at": proposed_recovery, 

64 } 

65 else: 

66 record = { 

67 "first_seen": current.get("first_seen", now), 

68 "last_seen": now, 

69 "attempts": int(current.get("attempts", 0)) + 1, 

70 "estimated_recovery_at": max( 

71 float(current.get("estimated_recovery_at", 0.0)), 

72 proposed_recovery, 

73 ), 

74 } 

75 self._write_atomic(record) 

76 

77 def get_estimated_recovery(self) -> float | None: 

78 """Return epoch timestamp of estimated recovery, or None if stale/absent.""" 

79 if self.is_stale(): 

80 return None 

81 current = self._read_unlocked() 

82 if current is None: 

83 return None 

84 recovery = current.get("estimated_recovery_at") 

85 return float(recovery) if recovery is not None else None 

86 

87 def is_stale(self) -> bool: 

88 """True if the stored entry's ``last_seen`` is >1h ago (or file absent).""" 

89 current = self._read_unlocked() 

90 if current is None: 

91 return False 

92 last_seen = current.get("last_seen") 

93 if last_seen is None: 

94 return True 

95 return (time.time() - float(last_seen)) > STALE_THRESHOLD_SECONDS 

96 

97 def clear(self) -> None: 

98 """Remove the state file. No-op if already absent.""" 

99 try: 

100 self.path.unlink() 

101 except FileNotFoundError: 

102 pass 

103 

104 def _read_unlocked(self) -> dict[str, Any] | None: 

105 """Read the state file, treating absent/corrupt as None.""" 

106 if not self.path.exists(): 

107 return None 

108 try: 

109 raw = self.path.read_text() 

110 except FileNotFoundError: 

111 return None 

112 if not raw: 

113 return None 

114 try: 

115 data = json.loads(raw) 

116 except json.JSONDecodeError: 

117 logger.warning("Corrupted circuit file %s; treating as absent", self.path) 

118 return None 

119 return data if isinstance(data, dict) else None 

120 

121 def _write_atomic(self, data: dict[str, Any]) -> None: 

122 """Atomically write ``data`` as JSON to ``self.path``.""" 

123 payload = json.dumps(data, indent=2) 

124 tmp_fd, tmp_path = tempfile.mkstemp(dir=self.path.parent, suffix=".tmp") 

125 try: 

126 with os.fdopen(tmp_fd, "w") as f: 

127 f.write(payload) 

128 os.replace(tmp_path, self.path) 

129 except Exception: 

130 try: 

131 os.unlink(tmp_path) 

132 except FileNotFoundError: 

133 pass 

134 raise