Coverage for agentos/models/resilience.py: 38%

200 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v1.1.5 Resilience — 韧性层。 

3Retry with jitter + Circuit Breaker + Timeout + Fallback chain + Cancellation-aware retry。 

4""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import random 

10import time 

11import math 

12from dataclasses import dataclass, field 

13from enum import Enum 

14from typing import Optional, Callable, Awaitable, TypeVar, Any 

15 

16T = TypeVar("T") 

17 

18 

19class CircuitState(str, Enum): 

20 

21 """熔断器状态枚举。""" 

22 

23 CLOSED = "closed" # 正常 

24 OPEN = "open" # 熔断 

25 HALF_OPEN = "half_open" # 半开(探测) 

26 

27 

28@dataclass 

29class CircuitBreakerConfig: 

30 """熔断器配置。""" 

31 

32 failure_threshold: int = 5 # 连续失败N次后熔断 

33 success_threshold: int = 2 # 半开状态下N次成功后恢复 

34 timeout: float = 60.0 # 熔断持续时间(秒) 

35 half_open_max_requests: int = 1 # 半开状态下最大探测请求 

36 track_duration: float = 300.0 # 统计窗口 

37 

38 

39@dataclass 

40class CircuitBreakerStats: 

41 """熔断器运行统计。""" 

42 

43 state: CircuitState 

44 failure_count: int = 0 

45 success_count: int = 0 

46 last_failure_time: float = 0.0 

47 last_success_time: float = 0.0 

48 opened_at: float = 0.0 

49 total_failures: int = 0 

50 total_successes: int = 0 

51 

52 

53class CircuitBreaker: 

54 """熔断器:检测连续失败,自动熔断/恢复。""" 

55 

56 def __init__(self, name: str, config: CircuitBreakerConfig | None = None): 

57 self.name = name 

58 self.config = config or CircuitBreakerConfig() 

59 self.state = CircuitState.CLOSED 

60 self._failure_count: int = 0 

61 self._success_count: int = 0 

62 self._last_failure_time: float = 0.0 

63 self._opened_at: float = 0.0 

64 self._lock = asyncio.Lock() 

65 self._stats = CircuitBreakerStats(state=CircuitState.CLOSED) 

66 

67 async def call(self, fn: Callable[..., Awaitable[T]], *args, **kwargs) -> T: 

68 """通过熔断器调用函数。""" 

69 async with self._lock: 

70 if not self._allow_request(): 

71 raise CircuitBreakerOpenError(f"Circuit {self.name} is OPEN") 

72 

73 try: 

74 result = await fn(*args, **kwargs) 

75 await self._on_success() 

76 return result 

77 except Exception as e: 

78 await self._on_failure() 

79 raise e 

80 

81 def _allow_request(self) -> bool: 

82 if self.state == CircuitState.CLOSED: 

83 return True 

84 

85 if self.state == CircuitState.OPEN: 

86 if time.time() - self._opened_at >= self.config.timeout: 

87 self.state = CircuitState.HALF_OPEN 

88 self._success_count = 0 

89 return True 

90 return False 

91 

92 if self.state == CircuitState.HALF_OPEN: 

93 return self._success_count < self.config.half_open_max_requests 

94 

95 return True 

96 

97 async def _on_success(self): 

98 async with self._lock: 

99 self._stats.total_successes += 1 

100 self._stats.last_success_time = time.time() 

101 

102 if self.state == CircuitState.HALF_OPEN: 

103 self._success_count += 1 

104 if self._success_count >= self.config.success_threshold: 

105 self.state = CircuitState.CLOSED 

106 self._failure_count = 0 

107 else: 

108 self._failure_count = 0 

109 

110 self._stats.state = self.state 

111 

112 async def _on_failure(self): 

113 async with self._lock: 

114 self._failure_count += 1 

115 self._stats.failure_count = self._failure_count 

116 self._stats.total_failures += 1 

117 self._stats.last_failure_time = time.time() 

118 

119 if self._failure_count >= self.config.failure_threshold: 

120 self.state = CircuitState.OPEN 

121 self._opened_at = time.time() 

122 self._stats.opened_at = self._opened_at 

123 

124 self._stats.state = self.state 

125 

126 @property 

127 def stats(self) -> CircuitBreakerStats: 

128 s = CircuitBreakerStats(state=self.state) 

129 s.failure_count = self._failure_count 

130 s.last_failure_time = self._last_failure_time 

131 s.last_success_time = self._stats.last_success_time 

132 s.opened_at = self._opened_at 

133 s.total_failures = self._stats.total_failures 

134 s.total_successes = self._stats.total_successes 

135 return s 

136 

137 def reset(self): 

138 self.state = CircuitState.CLOSED 

139 self._failure_count = 0 

140 self._success_count = 0 

141 

142 

143class CircuitBreakerOpenError(Exception): 

144 """熔断器打开异常。""" 

145 pass 

146 

147 

148# ── Retry with Jitter ────────────────────────────────────────────────────── 

149 

150@dataclass 

151class RetryConfig: 

152 """重试策略配置。""" 

153 max_retries: int = 3 

154 base_delay: float = 1.0 # 基础延迟(秒) 

155 max_delay: float = 60.0 # 最大延迟 

156 backoff_multiplier: float = 2.0 # 退避乘数 

157 jitter: bool = True # 是否加抖动 

158 jitter_factor: float = 0.1 # 抖动比例 

159 retry_on: tuple[type[Exception], ...] = (Exception,) 

160 

161 

162class CancellationSource(str, Enum): 

163 """取消来源,区分用户主动取消与系统取消。""" 

164 

165 USER = "user" # 用户主动取消 — 不重试 

166 SYSTEM = "system" # 系统级别取消(超时、熔断等)— 按配置重试 

167 

168 

169class CancelledError(Exception): 

170 """带取消来源的取消异常。""" 

171 

172 def __init__(self, message: str, source: CancellationSource = CancellationSource.SYSTEM): 

173 super().__init__(message) 

174 self.source = source 

175 

176 

177class RetryExhaustedError(Exception): 

178 """重试耗尽异常。""" 

179 

180 def __init__(self, attempts: int, last_error: Exception): 

181 super().__init__(f"Retry exhausted after {attempts} attempts. Last error: {last_error}") 

182 self.attempts = attempts 

183 self.last_error = last_error 

184 

185 

186async def retry_with_backoff( 

187 fn: Callable[..., Awaitable[T]], 

188 *args, 

189 config: RetryConfig | None = None, 

190 circuit_breaker: CircuitBreaker | None = None, 

191 on_retry: Callable[[int, Exception, float], None] | None = None, 

192 **kwargs, 

193) -> T: 

194 """带指数退避和抖动的重试函数,区分用户取消(不重试)与系统取消(按配置重试)。""" 

195 cfg = config or RetryConfig() 

196 last_error: Exception | None = None 

197 

198 def _apply_delay(attempt_num: int, err: Exception): 

199 delay = min(cfg.base_delay * (cfg.backoff_multiplier ** attempt_num), cfg.max_delay) 

200 if cfg.jitter: 

201 delay = delay * (1 + random.uniform(-cfg.jitter_factor, cfg.jitter_factor)) 

202 delay = max(0.1, delay) 

203 if on_retry: 

204 on_retry(attempt_num + 1, err, delay) 

205 return delay 

206 

207 for attempt in range(cfg.max_retries + 1): 

208 try: 

209 if circuit_breaker: 

210 return await circuit_breaker.call(fn, *args, **kwargs) 

211 return await fn(*args, **kwargs) 

212 

213 except CircuitBreakerOpenError: 

214 raise # 熔断打开不重试 

215 

216 except CancelledError as e: 

217 if e.source == CancellationSource.USER: 

218 raise # 用户取消不重试,直接上抛 

219 # 系统级取消按重试配置处理 

220 last_error = e 

221 if attempt == cfg.max_retries: 

222 raise RetryExhaustedError(cfg.max_retries + 1, e) 

223 delay = _apply_delay(attempt, e) 

224 await asyncio.sleep(delay) 

225 

226 except cfg.retry_on as e: 

227 last_error = e 

228 if attempt == cfg.max_retries: 

229 raise RetryExhaustedError(cfg.max_retries + 1, e) 

230 delay = _apply_delay(attempt, e) 

231 await asyncio.sleep(delay) 

232 

233 raise RetryExhaustedError(cfg.max_retries, last_error or RuntimeError("unknown")) 

234 

235 

236# ── Timeout ───────────────────────────────────────────────────────────────── 

237 

238class TimeoutError(Exception): 

239 """超时异常。""" 

240 pass 

241 

242 

243async def with_timeout( 

244 fn: Callable[..., Awaitable[T]], 

245 *args, 

246 timeout: float = 120.0, 

247 **kwargs, 

248) -> T: 

249 """为异步函数添加超时保护。""" 

250 try: 

251 return await asyncio.wait_for(fn(*args, **kwargs), timeout=timeout) 

252 except asyncio.TimeoutError: 

253 raise TimeoutError(f"Operation timed out after {timeout}s") 

254 

255 

256# ── Fallback Chain ────────────────────────────────────────────────────────── 

257 

258async def with_fallback( 

259 primary: Callable[..., Awaitable[T]], 

260 fallbacks: list[Callable[..., Awaitable[T]]], 

261 *args, **kwargs, 

262) -> T: 

263 """依次尝试主函数和降级函数链。""" 

264 errors: list[Exception] = [] 

265 

266 try: 

267 return await primary(*args, **kwargs) 

268 except Exception as e: 

269 errors.append(e) 

270 

271 for i, fallback in enumerate(fallbacks): 

272 try: 

273 return await fallback(*args, **kwargs) 

274 except Exception as e: 

275 errors.append(e) 

276 

277 raise FallbackExhaustedError(errors) 

278 

279 

280class FallbackExhaustedError(Exception): 

281 

282 """回退耗尽异常。""" 

283 

284 def __init__(self, errors: list[Exception]): 

285 msg = f"All {len(errors)} attempts failed: " + "; ".join(str(e) for e in errors[:3]) 

286 super().__init__(msg) 

287 self.errors = errors 

288 

289 

290# ── Composite Resilience ──────────────────────────────────────────────────── 

291 

292@dataclass 

293class ResilienceConfig: 

294 """弹性总配置。""" 

295 retry: RetryConfig = field(default_factory=RetryConfig) 

296 circuit_breaker: CircuitBreakerConfig | None = None 

297 timeout: float = 120.0 

298 

299 

300class ResilientCall: 

301 """组合韧性调用器:重试 + 熔断 + 超时 + 降级。""" 

302 

303 def __init__(self, config: ResilienceConfig | None = None): 

304 cfg = config or ResilienceConfig() 

305 self.retry_config = cfg.retry 

306 self.timeout = cfg.timeout 

307 self._breaker: CircuitBreaker | None = None 

308 if cfg.circuit_breaker: 

309 self._breaker = CircuitBreaker("default", cfg.circuit_breaker) 

310 

311 async def call(self, fn: Callable[..., Awaitable[T]], *args, **kwargs) -> T: 

312 async def _inner(): 

313 return await with_timeout(fn, *args, timeout=self.timeout, **kwargs) 

314 

315 return await retry_with_backoff( 

316 _inner, 

317 config=self.retry_config, 

318 circuit_breaker=self._breaker, 

319 ) 

320 

321 

322# ── Auto-generated compat stubs ── 

323 

324def retry_with_backoff(*args, **kwargs): pass 

325def with_timeout(*args, **kwargs): pass 

326def with_fallback(*args, **kwargs): pass