Coverage for agentos/tools/connection_pool.py: 0%
233 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 06:58 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 06:58 +0800
1"""
2Connection Pooling & Resource Management for AgentOS.
3Generic connection pool, rate limiter, resource quota manager, and health-checked pools.
4"""
6import threading
7import time
8from collections import deque
9from dataclasses import dataclass, field
10from typing import Any, Callable, Dict, Generic, List, Optional, Set, TypeVar
12T = TypeVar("T")
15# ============================================================================
16# ConnectionPool
17# ============================================================================
19@dataclass
20class _PooledConn(Generic[T]):
21 conn: T
22 created_at: float
23 last_used: float
24 borrowed: bool = False
27class ConnectionPool(Generic[T]):
28 """Thread-safe generic connection pool with health checks and idle eviction.
30 Supports: min/max sizing, health validation, auto-reconnect, idle timeout.
31 """
33 def __init__(
34 self,
35 factory: Callable[[], T],
36 health_check: Optional[Callable[[T], bool]] = None,
37 closer: Optional[Callable[[T], None]] = None,
38 min_size: int = 2,
39 max_size: int = 20,
40 max_idle: int = 10,
41 idle_timeout: float = 300.0,
42 checkout_timeout: float = 30.0,
43 ):
44 self._factory = factory
45 self._health_check = health_check
46 self._closer = closer
47 self._min_size = min_size
48 self._max_size = max_size
49 self._max_idle = max_idle
50 self._idle_timeout = idle_timeout
51 self._checkout_timeout = checkout_timeout
52 self._pool: deque[_PooledConn[T]] = deque()
53 self._lock = threading.RLock()
54 self._condition = threading.Condition(self._lock)
55 self._total_created: int = 0
56 self._total_borrowed: int = 0
57 self._total_returned: int = 0
58 self._total_failed_health: int = 0
59 self._closed: bool = False
61 def _create(self) -> _PooledConn[T]:
62 conn = self._factory()
63 self._total_created += 1
64 now = time.monotonic()
65 return _PooledConn(conn=conn, created_at=now, last_used=now)
67 def _validate(self, pc: _PooledConn[T]) -> bool:
68 if self._health_check is None:
69 return True
70 try:
71 ok = self._health_check(pc.conn)
72 if not ok:
73 self._total_failed_health += 1
74 return ok
75 except Exception:
76 self._total_failed_health += 1
77 return False
79 def acquire(self, timeout: Optional[float] = None) -> T:
80 """Borrow a connection from the pool. Blocks until available or timeout."""
81 if timeout is None:
82 timeout = self._checkout_timeout
84 with self._condition:
85 deadline = time.monotonic() + timeout
87 while True:
88 if self._closed:
89 raise RuntimeError("ConnectionPool is closed")
91 # Find a valid idle connection; evict unhealthy ones
92 unhealthy: List[_PooledConn[T]] = []
93 for pc in self._pool:
94 if pc.borrowed:
95 continue
96 if self._validate(pc):
97 pc.borrowed = True
98 pc.last_used = time.monotonic()
99 self._total_borrowed += 1
100 return pc.conn
101 else:
102 unhealthy.append(pc)
104 for pc in unhealthy:
105 if pc in self._pool:
106 self._pool.remove(pc)
107 self._close_conn(pc)
109 # Try to create a new one if under max
110 active = sum(1 for pc in self._pool if pc.borrowed)
111 if active + len([pc for pc in self._pool if not pc.borrowed]) < self._max_size:
112 pc = self._create()
113 pc.borrowed = True
114 self._total_borrowed += 1
115 self._pool.append(pc)
116 return pc.conn
118 # Wait for a connection to be returned
119 remaining = deadline - time.monotonic()
120 if remaining <= 0:
121 raise TimeoutError(f"Timed out waiting for connection after {timeout}s")
122 self._condition.wait(timeout=min(remaining, 1.0))
124 def release(self, conn: T) -> None:
125 """Return a connection to the pool."""
126 with self._condition:
127 for pc in self._pool:
128 if pc.conn is conn:
129 pc.borrowed = False
130 pc.last_used = time.monotonic()
131 self._total_returned += 1
132 self._condition.notify()
133 return
134 # Connection not in pool — close it
135 self._close_raw(conn)
136 self._condition.notify()
138 def _close_conn(self, pc: _PooledConn[T]) -> None:
139 self._close_raw(pc.conn)
141 def _close_raw(self, conn: T) -> None:
142 if self._closer:
143 try:
144 self._closer(conn)
145 except Exception:
146 pass
148 def warm_up(self) -> int:
149 """Pre-create connections up to min_size. Returns number created."""
150 count = 0
151 with self._condition:
152 idle = sum(1 for pc in self._pool if not pc.borrowed)
153 needed = self._min_size - idle
154 for _ in range(needed):
155 self._pool.append(self._create())
156 count += 1
157 return count
159 def evict_idle(self) -> int:
160 """Remove idle connections past timeout. Returns number evicted."""
161 now = time.monotonic()
162 count = 0
163 with self._condition:
164 # Preserve min_size
165 idle = [pc for pc in self._pool if not pc.borrowed]
166 to_keep = self._min_size
167 old_first = sorted(idle, key=lambda pc: pc.last_used)
168 for pc in old_first[to_keep:]:
169 if now - pc.last_used > self._idle_timeout:
170 self._pool.remove(pc)
171 self._close_conn(pc)
172 count += 1
173 return count
175 def close(self) -> None:
176 """Close all connections and shut down the pool."""
177 with self._condition:
178 self._closed = True
179 for pc in self._pool:
180 self._close_conn(pc)
181 self._pool.clear()
182 self._condition.notify_all()
184 @property
185 def stats(self) -> Dict[str, Any]:
186 with self._lock:
187 active = sum(1 for pc in self._pool if pc.borrowed)
188 idle = sum(1 for pc in self._pool if not pc.borrowed)
189 return {
190 "total_created": self._total_created,
191 "total_borrowed": self._total_borrowed,
192 "total_returned": self._total_returned,
193 "active": active,
194 "idle": idle,
195 "total": len(self._pool),
196 "failed_health_checks": self._total_failed_health,
197 "capacity": self._max_size,
198 }
200 def __enter__(self):
201 return self
203 def __exit__(self, *args):
204 self.close()
207# ============================================================================
208# RateLimiter
209# ============================================================================
211class RateLimiter:
212 """Thread-safe token bucket rate limiter with burst support."""
214 def __init__(self, rate: float, burst: int = 1):
215 """rate: tokens per second. burst: max tokens accumulated."""
216 self._rate = rate
217 self._burst = burst
218 self._tokens: float = burst
219 self._last_refill: float = time.monotonic()
220 self._lock = threading.Lock()
221 self._total_acquired: int = 0
222 self._total_rejected: int = 0
224 def acquire(self, count: int = 1, timeout: Optional[float] = None) -> bool:
225 """Try to acquire N tokens. Blocks up to timeout if not enough."""
226 deadline = time.monotonic() + timeout if timeout else None
228 with self._lock:
229 while True:
230 self._refill()
231 if self._tokens >= count:
232 self._tokens -= count
233 self._total_acquired += count
234 return True
236 if deadline and time.monotonic() >= deadline:
237 self._total_rejected += count
238 return False
240 # Wait for refill
241 wait_time = (count - self._tokens) / self._rate
242 if deadline:
243 wait_time = min(wait_time, deadline - time.monotonic())
244 if wait_time <= 0:
245 self._total_rejected += count
246 return False
248 # Release lock during wait
249 self._lock.release()
250 try:
251 time.sleep(wait_time)
252 finally:
253 self._lock.acquire()
255 def try_acquire(self, count: int = 1) -> bool:
256 """Non-blocking attempt to acquire tokens."""
257 with self._lock:
258 self._refill()
259 if self._tokens >= count:
260 self._tokens -= count
261 self._total_acquired += count
262 return True
263 self._total_rejected += count
264 return False
266 def _refill(self) -> None:
267 now = time.monotonic()
268 elapsed = now - self._last_refill
269 self._tokens = min(self._burst, self._tokens + elapsed * self._rate)
270 self._last_refill = now
272 @property
273 def available(self) -> float:
274 with self._lock:
275 self._refill()
276 return self._tokens
278 @property
279 def stats(self) -> Dict[str, Any]:
280 with self._lock:
281 self._refill()
282 return {
283 "rate": self._rate,
284 "burst": self._burst,
285 "tokens_available": round(self._tokens, 2),
286 "total_acquired": self._total_acquired,
287 "total_rejected": self._total_rejected,
288 }
291# ============================================================================
292# ResourceQuota
293# ============================================================================
295class ResourceQuota:
296 """Track and enforce resource usage quotas per component."""
298 def __init__(self, global_limit: int = 1024):
299 self._global_limit = global_limit
300 self._allocations: Dict[str, int] = {}
301 self._lock = threading.Lock()
303 def allocate(self, component: str, amount: int = 1) -> bool:
304 """Try to allocate resources. Returns True if successful."""
305 with self._lock:
306 current_total = sum(self._allocations.values())
307 if current_total + amount > self._global_limit:
308 return False
309 self._allocations[component] = self._allocations.get(component, 0) + amount
310 return True
312 def release(self, component: str, amount: int = 1) -> None:
313 with self._lock:
314 current = self._allocations.get(component, 0)
315 self._allocations[component] = max(0, current - amount)
317 def set_limit(self, component: str, limit: int) -> None:
318 with self._lock:
319 current = self._allocations.get(component, 0)
320 if current > limit:
321 self._allocations[component] = limit
323 def get_usage(self, component: str) -> int:
324 with self._lock:
325 return self._allocations.get(component, 0)
327 @property
328 def total_used(self) -> int:
329 with self._lock:
330 return sum(self._allocations.values())
332 @property
333 def remaining(self) -> int:
334 return self._global_limit - self.total_used
336 @property
337 def stats(self) -> Dict[str, Any]:
338 with self._lock:
339 return {
340 "global_limit": self._global_limit,
341 "total_used": self.total_used,
342 "remaining": self.remaining,
343 "allocations": dict(self._allocations),
344 }
347# ============================================================================
348# Convenience Functions
349# ============================================================================
351def create_connection_pool(
352 factory: Callable[[], T],
353 health_check: Optional[Callable[[T], bool]] = None,
354 closer: Optional[Callable[[T], None]] = None,
355 min_size: int = 2,
356 max_size: int = 20,
357) -> ConnectionPool[T]:
358 """Create a thread-safe connection pool."""
359 return ConnectionPool(
360 factory,
361 health_check=health_check,
362 closer=closer,
363 min_size=min_size,
364 max_size=max_size,
365 )
368def create_rate_limiter(rate: float, burst: int = 10) -> RateLimiter:
369 """Create a token bucket rate limiter."""
370 return RateLimiter(rate=rate, burst=burst)
373def create_resource_quota(global_limit: int = 1024) -> ResourceQuota:
374 """Create a resource quota manager."""
375 return ResourceQuota(global_limit=global_limit)