Coverage for agentos/models/resilience.py: 38%
200 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v1.1.5 Resilience — 韧性层。
3Retry with jitter + Circuit Breaker + Timeout + Fallback chain + Cancellation-aware retry。
4"""
6from __future__ import annotations
8import asyncio
9import random
10import time
11import math
12from dataclasses import dataclass, field
13from enum import Enum
14from typing import Optional, Callable, Awaitable, TypeVar, Any
16T = TypeVar("T")
19class CircuitState(str, Enum):
21 """熔断器状态枚举。"""
23 CLOSED = "closed" # 正常
24 OPEN = "open" # 熔断
25 HALF_OPEN = "half_open" # 半开(探测)
28@dataclass
29class CircuitBreakerConfig:
30 """熔断器配置。"""
32 failure_threshold: int = 5 # 连续失败N次后熔断
33 success_threshold: int = 2 # 半开状态下N次成功后恢复
34 timeout: float = 60.0 # 熔断持续时间(秒)
35 half_open_max_requests: int = 1 # 半开状态下最大探测请求
36 track_duration: float = 300.0 # 统计窗口
39@dataclass
40class CircuitBreakerStats:
41 """熔断器运行统计。"""
43 state: CircuitState
44 failure_count: int = 0
45 success_count: int = 0
46 last_failure_time: float = 0.0
47 last_success_time: float = 0.0
48 opened_at: float = 0.0
49 total_failures: int = 0
50 total_successes: int = 0
53class CircuitBreaker:
54 """熔断器:检测连续失败,自动熔断/恢复。"""
56 def __init__(self, name: str, config: CircuitBreakerConfig | None = None):
57 self.name = name
58 self.config = config or CircuitBreakerConfig()
59 self.state = CircuitState.CLOSED
60 self._failure_count: int = 0
61 self._success_count: int = 0
62 self._last_failure_time: float = 0.0
63 self._opened_at: float = 0.0
64 self._lock = asyncio.Lock()
65 self._stats = CircuitBreakerStats(state=CircuitState.CLOSED)
67 async def call(self, fn: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
68 """通过熔断器调用函数。"""
69 async with self._lock:
70 if not self._allow_request():
71 raise CircuitBreakerOpenError(f"Circuit {self.name} is OPEN")
73 try:
74 result = await fn(*args, **kwargs)
75 await self._on_success()
76 return result
77 except Exception as e:
78 await self._on_failure()
79 raise e
81 def _allow_request(self) -> bool:
82 if self.state == CircuitState.CLOSED:
83 return True
85 if self.state == CircuitState.OPEN:
86 if time.time() - self._opened_at >= self.config.timeout:
87 self.state = CircuitState.HALF_OPEN
88 self._success_count = 0
89 return True
90 return False
92 if self.state == CircuitState.HALF_OPEN:
93 return self._success_count < self.config.half_open_max_requests
95 return True
97 async def _on_success(self):
98 async with self._lock:
99 self._stats.total_successes += 1
100 self._stats.last_success_time = time.time()
102 if self.state == CircuitState.HALF_OPEN:
103 self._success_count += 1
104 if self._success_count >= self.config.success_threshold:
105 self.state = CircuitState.CLOSED
106 self._failure_count = 0
107 else:
108 self._failure_count = 0
110 self._stats.state = self.state
112 async def _on_failure(self):
113 async with self._lock:
114 self._failure_count += 1
115 self._stats.failure_count = self._failure_count
116 self._stats.total_failures += 1
117 self._stats.last_failure_time = time.time()
119 if self._failure_count >= self.config.failure_threshold:
120 self.state = CircuitState.OPEN
121 self._opened_at = time.time()
122 self._stats.opened_at = self._opened_at
124 self._stats.state = self.state
126 @property
127 def stats(self) -> CircuitBreakerStats:
128 s = CircuitBreakerStats(state=self.state)
129 s.failure_count = self._failure_count
130 s.last_failure_time = self._last_failure_time
131 s.last_success_time = self._stats.last_success_time
132 s.opened_at = self._opened_at
133 s.total_failures = self._stats.total_failures
134 s.total_successes = self._stats.total_successes
135 return s
137 def reset(self):
138 self.state = CircuitState.CLOSED
139 self._failure_count = 0
140 self._success_count = 0
143class CircuitBreakerOpenError(Exception):
144 """熔断器打开异常。"""
145 pass
148# ── Retry with Jitter ──────────────────────────────────────────────────────
150@dataclass
151class RetryConfig:
152 """重试策略配置。"""
153 max_retries: int = 3
154 base_delay: float = 1.0 # 基础延迟(秒)
155 max_delay: float = 60.0 # 最大延迟
156 backoff_multiplier: float = 2.0 # 退避乘数
157 jitter: bool = True # 是否加抖动
158 jitter_factor: float = 0.1 # 抖动比例
159 retry_on: tuple[type[Exception], ...] = (Exception,)
162class CancellationSource(str, Enum):
163 """取消来源,区分用户主动取消与系统取消。"""
165 USER = "user" # 用户主动取消 — 不重试
166 SYSTEM = "system" # 系统级别取消(超时、熔断等)— 按配置重试
169class CancelledError(Exception):
170 """带取消来源的取消异常。"""
172 def __init__(self, message: str, source: CancellationSource = CancellationSource.SYSTEM):
173 super().__init__(message)
174 self.source = source
177class RetryExhaustedError(Exception):
178 """重试耗尽异常。"""
180 def __init__(self, attempts: int, last_error: Exception):
181 super().__init__(f"Retry exhausted after {attempts} attempts. Last error: {last_error}")
182 self.attempts = attempts
183 self.last_error = last_error
186async def retry_with_backoff(
187 fn: Callable[..., Awaitable[T]],
188 *args,
189 config: RetryConfig | None = None,
190 circuit_breaker: CircuitBreaker | None = None,
191 on_retry: Callable[[int, Exception, float], None] | None = None,
192 **kwargs,
193) -> T:
194 """带指数退避和抖动的重试函数,区分用户取消(不重试)与系统取消(按配置重试)。"""
195 cfg = config or RetryConfig()
196 last_error: Exception | None = None
198 def _apply_delay(attempt_num: int, err: Exception):
199 delay = min(cfg.base_delay * (cfg.backoff_multiplier ** attempt_num), cfg.max_delay)
200 if cfg.jitter:
201 delay = delay * (1 + random.uniform(-cfg.jitter_factor, cfg.jitter_factor))
202 delay = max(0.1, delay)
203 if on_retry:
204 on_retry(attempt_num + 1, err, delay)
205 return delay
207 for attempt in range(cfg.max_retries + 1):
208 try:
209 if circuit_breaker:
210 return await circuit_breaker.call(fn, *args, **kwargs)
211 return await fn(*args, **kwargs)
213 except CircuitBreakerOpenError:
214 raise # 熔断打开不重试
216 except CancelledError as e:
217 if e.source == CancellationSource.USER:
218 raise # 用户取消不重试,直接上抛
219 # 系统级取消按重试配置处理
220 last_error = e
221 if attempt == cfg.max_retries:
222 raise RetryExhaustedError(cfg.max_retries + 1, e)
223 delay = _apply_delay(attempt, e)
224 await asyncio.sleep(delay)
226 except cfg.retry_on as e:
227 last_error = e
228 if attempt == cfg.max_retries:
229 raise RetryExhaustedError(cfg.max_retries + 1, e)
230 delay = _apply_delay(attempt, e)
231 await asyncio.sleep(delay)
233 raise RetryExhaustedError(cfg.max_retries, last_error or RuntimeError("unknown"))
236# ── Timeout ─────────────────────────────────────────────────────────────────
238class TimeoutError(Exception):
239 """超时异常。"""
240 pass
243async def with_timeout(
244 fn: Callable[..., Awaitable[T]],
245 *args,
246 timeout: float = 120.0,
247 **kwargs,
248) -> T:
249 """为异步函数添加超时保护。"""
250 try:
251 return await asyncio.wait_for(fn(*args, **kwargs), timeout=timeout)
252 except asyncio.TimeoutError:
253 raise TimeoutError(f"Operation timed out after {timeout}s")
256# ── Fallback Chain ──────────────────────────────────────────────────────────
258async def with_fallback(
259 primary: Callable[..., Awaitable[T]],
260 fallbacks: list[Callable[..., Awaitable[T]]],
261 *args, **kwargs,
262) -> T:
263 """依次尝试主函数和降级函数链。"""
264 errors: list[Exception] = []
266 try:
267 return await primary(*args, **kwargs)
268 except Exception as e:
269 errors.append(e)
271 for i, fallback in enumerate(fallbacks):
272 try:
273 return await fallback(*args, **kwargs)
274 except Exception as e:
275 errors.append(e)
277 raise FallbackExhaustedError(errors)
280class FallbackExhaustedError(Exception):
282 """回退耗尽异常。"""
284 def __init__(self, errors: list[Exception]):
285 msg = f"All {len(errors)} attempts failed: " + "; ".join(str(e) for e in errors[:3])
286 super().__init__(msg)
287 self.errors = errors
290# ── Composite Resilience ────────────────────────────────────────────────────
292@dataclass
293class ResilienceConfig:
294 """弹性总配置。"""
295 retry: RetryConfig = field(default_factory=RetryConfig)
296 circuit_breaker: CircuitBreakerConfig | None = None
297 timeout: float = 120.0
300class ResilientCall:
301 """组合韧性调用器:重试 + 熔断 + 超时 + 降级。"""
303 def __init__(self, config: ResilienceConfig | None = None):
304 cfg = config or ResilienceConfig()
305 self.retry_config = cfg.retry
306 self.timeout = cfg.timeout
307 self._breaker: CircuitBreaker | None = None
308 if cfg.circuit_breaker:
309 self._breaker = CircuitBreaker("default", cfg.circuit_breaker)
311 async def call(self, fn: Callable[..., Awaitable[T]], *args, **kwargs) -> T:
312 async def _inner():
313 return await with_timeout(fn, *args, timeout=self.timeout, **kwargs)
315 return await retry_with_backoff(
316 _inner,
317 config=self.retry_config,
318 circuit_breaker=self._breaker,
319 )
322# ── Auto-generated compat stubs ──
324def retry_with_backoff(*args, **kwargs): pass
325def with_timeout(*args, **kwargs): pass
326def with_fallback(*args, **kwargs): pass