Coverage for agentos/tools/connection_pool.py: 0%

233 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 06:58 +0800

1""" 

2Connection Pooling & Resource Management for AgentOS. 

3Generic connection pool, rate limiter, resource quota manager, and health-checked pools. 

4""" 

5 

6import threading 

7import time 

8from collections import deque 

9from dataclasses import dataclass, field 

10from typing import Any, Callable, Dict, Generic, List, Optional, Set, TypeVar 

11 

12T = TypeVar("T") 

13 

14 

15# ============================================================================ 

16# ConnectionPool 

17# ============================================================================ 

18 

19@dataclass 

20class _PooledConn(Generic[T]): 

21 conn: T 

22 created_at: float 

23 last_used: float 

24 borrowed: bool = False 

25 

26 

27class ConnectionPool(Generic[T]): 

28 """Thread-safe generic connection pool with health checks and idle eviction. 

29 

30 Supports: min/max sizing, health validation, auto-reconnect, idle timeout. 

31 """ 

32 

33 def __init__( 

34 self, 

35 factory: Callable[[], T], 

36 health_check: Optional[Callable[[T], bool]] = None, 

37 closer: Optional[Callable[[T], None]] = None, 

38 min_size: int = 2, 

39 max_size: int = 20, 

40 max_idle: int = 10, 

41 idle_timeout: float = 300.0, 

42 checkout_timeout: float = 30.0, 

43 ): 

44 self._factory = factory 

45 self._health_check = health_check 

46 self._closer = closer 

47 self._min_size = min_size 

48 self._max_size = max_size 

49 self._max_idle = max_idle 

50 self._idle_timeout = idle_timeout 

51 self._checkout_timeout = checkout_timeout 

52 self._pool: deque[_PooledConn[T]] = deque() 

53 self._lock = threading.RLock() 

54 self._condition = threading.Condition(self._lock) 

55 self._total_created: int = 0 

56 self._total_borrowed: int = 0 

57 self._total_returned: int = 0 

58 self._total_failed_health: int = 0 

59 self._closed: bool = False 

60 

61 def _create(self) -> _PooledConn[T]: 

62 conn = self._factory() 

63 self._total_created += 1 

64 now = time.monotonic() 

65 return _PooledConn(conn=conn, created_at=now, last_used=now) 

66 

67 def _validate(self, pc: _PooledConn[T]) -> bool: 

68 if self._health_check is None: 

69 return True 

70 try: 

71 ok = self._health_check(pc.conn) 

72 if not ok: 

73 self._total_failed_health += 1 

74 return ok 

75 except Exception: 

76 self._total_failed_health += 1 

77 return False 

78 

79 def acquire(self, timeout: Optional[float] = None) -> T: 

80 """Borrow a connection from the pool. Blocks until available or timeout.""" 

81 if timeout is None: 

82 timeout = self._checkout_timeout 

83 

84 with self._condition: 

85 deadline = time.monotonic() + timeout 

86 

87 while True: 

88 if self._closed: 

89 raise RuntimeError("ConnectionPool is closed") 

90 

91 # Find a valid idle connection; evict unhealthy ones 

92 unhealthy: List[_PooledConn[T]] = [] 

93 for pc in self._pool: 

94 if pc.borrowed: 

95 continue 

96 if self._validate(pc): 

97 pc.borrowed = True 

98 pc.last_used = time.monotonic() 

99 self._total_borrowed += 1 

100 return pc.conn 

101 else: 

102 unhealthy.append(pc) 

103 

104 for pc in unhealthy: 

105 if pc in self._pool: 

106 self._pool.remove(pc) 

107 self._close_conn(pc) 

108 

109 # Try to create a new one if under max 

110 active = sum(1 for pc in self._pool if pc.borrowed) 

111 if active + len([pc for pc in self._pool if not pc.borrowed]) < self._max_size: 

112 pc = self._create() 

113 pc.borrowed = True 

114 self._total_borrowed += 1 

115 self._pool.append(pc) 

116 return pc.conn 

117 

118 # Wait for a connection to be returned 

119 remaining = deadline - time.monotonic() 

120 if remaining <= 0: 

121 raise TimeoutError(f"Timed out waiting for connection after {timeout}s") 

122 self._condition.wait(timeout=min(remaining, 1.0)) 

123 

124 def release(self, conn: T) -> None: 

125 """Return a connection to the pool.""" 

126 with self._condition: 

127 for pc in self._pool: 

128 if pc.conn is conn: 

129 pc.borrowed = False 

130 pc.last_used = time.monotonic() 

131 self._total_returned += 1 

132 self._condition.notify() 

133 return 

134 # Connection not in pool — close it 

135 self._close_raw(conn) 

136 self._condition.notify() 

137 

138 def _close_conn(self, pc: _PooledConn[T]) -> None: 

139 self._close_raw(pc.conn) 

140 

141 def _close_raw(self, conn: T) -> None: 

142 if self._closer: 

143 try: 

144 self._closer(conn) 

145 except Exception: 

146 pass 

147 

148 def warm_up(self) -> int: 

149 """Pre-create connections up to min_size. Returns number created.""" 

150 count = 0 

151 with self._condition: 

152 idle = sum(1 for pc in self._pool if not pc.borrowed) 

153 needed = self._min_size - idle 

154 for _ in range(needed): 

155 self._pool.append(self._create()) 

156 count += 1 

157 return count 

158 

159 def evict_idle(self) -> int: 

160 """Remove idle connections past timeout. Returns number evicted.""" 

161 now = time.monotonic() 

162 count = 0 

163 with self._condition: 

164 # Preserve min_size 

165 idle = [pc for pc in self._pool if not pc.borrowed] 

166 to_keep = self._min_size 

167 old_first = sorted(idle, key=lambda pc: pc.last_used) 

168 for pc in old_first[to_keep:]: 

169 if now - pc.last_used > self._idle_timeout: 

170 self._pool.remove(pc) 

171 self._close_conn(pc) 

172 count += 1 

173 return count 

174 

175 def close(self) -> None: 

176 """Close all connections and shut down the pool.""" 

177 with self._condition: 

178 self._closed = True 

179 for pc in self._pool: 

180 self._close_conn(pc) 

181 self._pool.clear() 

182 self._condition.notify_all() 

183 

184 @property 

185 def stats(self) -> Dict[str, Any]: 

186 with self._lock: 

187 active = sum(1 for pc in self._pool if pc.borrowed) 

188 idle = sum(1 for pc in self._pool if not pc.borrowed) 

189 return { 

190 "total_created": self._total_created, 

191 "total_borrowed": self._total_borrowed, 

192 "total_returned": self._total_returned, 

193 "active": active, 

194 "idle": idle, 

195 "total": len(self._pool), 

196 "failed_health_checks": self._total_failed_health, 

197 "capacity": self._max_size, 

198 } 

199 

200 def __enter__(self): 

201 return self 

202 

203 def __exit__(self, *args): 

204 self.close() 

205 

206 

207# ============================================================================ 

208# RateLimiter 

209# ============================================================================ 

210 

211class RateLimiter: 

212 """Thread-safe token bucket rate limiter with burst support.""" 

213 

214 def __init__(self, rate: float, burst: int = 1): 

215 """rate: tokens per second. burst: max tokens accumulated.""" 

216 self._rate = rate 

217 self._burst = burst 

218 self._tokens: float = burst 

219 self._last_refill: float = time.monotonic() 

220 self._lock = threading.Lock() 

221 self._total_acquired: int = 0 

222 self._total_rejected: int = 0 

223 

224 def acquire(self, count: int = 1, timeout: Optional[float] = None) -> bool: 

225 """Try to acquire N tokens. Blocks up to timeout if not enough.""" 

226 deadline = time.monotonic() + timeout if timeout else None 

227 

228 with self._lock: 

229 while True: 

230 self._refill() 

231 if self._tokens >= count: 

232 self._tokens -= count 

233 self._total_acquired += count 

234 return True 

235 

236 if deadline and time.monotonic() >= deadline: 

237 self._total_rejected += count 

238 return False 

239 

240 # Wait for refill 

241 wait_time = (count - self._tokens) / self._rate 

242 if deadline: 

243 wait_time = min(wait_time, deadline - time.monotonic()) 

244 if wait_time <= 0: 

245 self._total_rejected += count 

246 return False 

247 

248 # Release lock during wait 

249 self._lock.release() 

250 try: 

251 time.sleep(wait_time) 

252 finally: 

253 self._lock.acquire() 

254 

255 def try_acquire(self, count: int = 1) -> bool: 

256 """Non-blocking attempt to acquire tokens.""" 

257 with self._lock: 

258 self._refill() 

259 if self._tokens >= count: 

260 self._tokens -= count 

261 self._total_acquired += count 

262 return True 

263 self._total_rejected += count 

264 return False 

265 

266 def _refill(self) -> None: 

267 now = time.monotonic() 

268 elapsed = now - self._last_refill 

269 self._tokens = min(self._burst, self._tokens + elapsed * self._rate) 

270 self._last_refill = now 

271 

272 @property 

273 def available(self) -> float: 

274 with self._lock: 

275 self._refill() 

276 return self._tokens 

277 

278 @property 

279 def stats(self) -> Dict[str, Any]: 

280 with self._lock: 

281 self._refill() 

282 return { 

283 "rate": self._rate, 

284 "burst": self._burst, 

285 "tokens_available": round(self._tokens, 2), 

286 "total_acquired": self._total_acquired, 

287 "total_rejected": self._total_rejected, 

288 } 

289 

290 

291# ============================================================================ 

292# ResourceQuota 

293# ============================================================================ 

294 

295class ResourceQuota: 

296 """Track and enforce resource usage quotas per component.""" 

297 

298 def __init__(self, global_limit: int = 1024): 

299 self._global_limit = global_limit 

300 self._allocations: Dict[str, int] = {} 

301 self._lock = threading.Lock() 

302 

303 def allocate(self, component: str, amount: int = 1) -> bool: 

304 """Try to allocate resources. Returns True if successful.""" 

305 with self._lock: 

306 current_total = sum(self._allocations.values()) 

307 if current_total + amount > self._global_limit: 

308 return False 

309 self._allocations[component] = self._allocations.get(component, 0) + amount 

310 return True 

311 

312 def release(self, component: str, amount: int = 1) -> None: 

313 with self._lock: 

314 current = self._allocations.get(component, 0) 

315 self._allocations[component] = max(0, current - amount) 

316 

317 def set_limit(self, component: str, limit: int) -> None: 

318 with self._lock: 

319 current = self._allocations.get(component, 0) 

320 if current > limit: 

321 self._allocations[component] = limit 

322 

323 def get_usage(self, component: str) -> int: 

324 with self._lock: 

325 return self._allocations.get(component, 0) 

326 

327 @property 

328 def total_used(self) -> int: 

329 with self._lock: 

330 return sum(self._allocations.values()) 

331 

332 @property 

333 def remaining(self) -> int: 

334 return self._global_limit - self.total_used 

335 

336 @property 

337 def stats(self) -> Dict[str, Any]: 

338 with self._lock: 

339 return { 

340 "global_limit": self._global_limit, 

341 "total_used": self.total_used, 

342 "remaining": self.remaining, 

343 "allocations": dict(self._allocations), 

344 } 

345 

346 

347# ============================================================================ 

348# Convenience Functions 

349# ============================================================================ 

350 

351def create_connection_pool( 

352 factory: Callable[[], T], 

353 health_check: Optional[Callable[[T], bool]] = None, 

354 closer: Optional[Callable[[T], None]] = None, 

355 min_size: int = 2, 

356 max_size: int = 20, 

357) -> ConnectionPool[T]: 

358 """Create a thread-safe connection pool.""" 

359 return ConnectionPool( 

360 factory, 

361 health_check=health_check, 

362 closer=closer, 

363 min_size=min_size, 

364 max_size=max_size, 

365 ) 

366 

367 

368def create_rate_limiter(rate: float, burst: int = 10) -> RateLimiter: 

369 """Create a token bucket rate limiter.""" 

370 return RateLimiter(rate=rate, burst=burst) 

371 

372 

373def create_resource_quota(global_limit: int = 1024) -> ResourceQuota: 

374 """Create a resource quota manager.""" 

375 return ResourceQuota(global_limit=global_limit)