Coverage for agentos/health/__init__.py: 32%
111 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""AgentOS health checks — readiness, liveness, and dependency probes.
3Provides standard health-check endpoints for Kubernetes, Docker, and load balancers.
4"""
6from __future__ import annotations
8import time
9from dataclasses import dataclass, field
10from enum import Enum
11from typing import Callable, Optional
14class HealthStatus(Enum):
16 """健康状态枚举。"""
18 HEALTHY = "healthy"
19 DEGRADED = "degraded"
20 UNHEALTHY = "unhealthy"
23@dataclass
24class HealthCheck:
25 """健康检查定义。"""
26 name: str
27 check_fn: Callable[[], bool]
28 timeout_seconds: float = 5.0
29 description: str = ""
32@dataclass
33class CheckResult:
34 """检查结果。"""
35 name: str
36 status: HealthStatus
37 latency_ms: float
38 message: str = ""
40 def to_dict(self) -> dict:
41 return {
42 "name": self.name,
43 "status": self.status.value,
44 "latency_ms": round(self.latency_ms, 2),
45 "message": self.message,
46 }
49class HealthChecker:
50 """Aggregate readiness and liveness checks."""
52 def __init__(self):
53 self._readiness_checks: list[HealthCheck] = []
54 self._liveness_checks: list[HealthCheck] = []
56 def add_readiness(self, check: HealthCheck):
57 self._readiness_checks.append(check)
59 def add_liveness(self, check: HealthCheck):
60 self._liveness_checks.append(check)
62 def _run_checks(self, checks: list[HealthCheck]) -> tuple[HealthStatus, list[CheckResult]]:
63 results: list[CheckResult] = []
64 overall = HealthStatus.HEALTHY
65 for chk in checks:
66 start = time.monotonic()
67 try:
68 ok = chk.check_fn()
69 except Exception as e:
70 ok = False
71 msg = str(e)
72 else:
73 msg = "ok" if ok else "check returned False"
74 latency = (time.monotonic() - start) * 1000
75 status = HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY
76 if status == HealthStatus.UNHEALTHY and overall != HealthStatus.UNHEALTHY:
77 overall = HealthStatus.DEGRADED
78 if status == HealthStatus.UNHEALTHY:
79 overall = HealthStatus.UNHEALTHY
80 results.append(CheckResult(name=chk.name, status=status, latency_ms=latency, message=msg))
81 return (overall, results)
83 def readiness(self) -> dict:
84 """Run all readiness checks. Returns a dict suitable for a /health/ready endpoint."""
85 overall, results = self._run_checks(self._readiness_checks)
86 return {
87 "status": overall.value,
88 "timestamp": time.time(),
89 "checks": [r.to_dict() for r in results],
90 }
92 def liveness(self) -> dict:
93 """Run all liveness checks. Returns a dict suitable for a /health/live endpoint."""
94 overall, results = self._run_checks(self._liveness_checks)
95 return {
96 "status": overall.value,
97 "timestamp": time.time(),
98 "checks": [r.to_dict() for r in results],
99 }
101 def all(self) -> dict:
102 """Combined readiness + liveness report, suitable for /health."""
103 r = self.readiness()
104 l = self.liveness()
105 combined_status = HealthStatus.HEALTHY
106 for s in (r["status"], l["status"]):
107 if s == HealthStatus.UNHEALTHY.value:
108 combined_status = HealthStatus.UNHEALTHY
109 break
110 if s == HealthStatus.DEGRADED.value:
111 combined_status = HealthStatus.DEGRADED
112 return {
113 "status": combined_status.value,
114 "timestamp": time.time(),
115 "readiness": r,
116 "liveness": l,
117 }
120# ── Built-in checks ───────────────────────────────────────────────────────────
123def check_openai_connectivity(api_key: Optional[str] = None) -> HealthCheck:
124 """Verify connectivity to the OpenAI API."""
125 def _check() -> bool:
126 try:
127 import urllib.request
128 req = urllib.request.Request("https://api.openai.com/v1/models", method="HEAD")
129 if api_key:
130 req.add_header("Authorization", f"Bearer {api_key}")
131 urllib.request.urlopen(req, timeout=5)
132 return True
133 except Exception:
134 return False
135 return HealthCheck(name="openai-connectivity", check_fn=_check, timeout_seconds=5.0,
136 description="Check OpenAI API reachability")
139def check_vectorstore_health(db_instance=None) -> HealthCheck:
140 """Check vector store connection health."""
141 def _check() -> bool:
142 if db_instance is None:
143 return False
144 try:
145 return hasattr(db_instance, "is_healthy") and db_instance.is_healthy()
146 except Exception:
147 return False
148 return HealthCheck(name="vectorstore-health", check_fn=_check, timeout_seconds=5.0,
149 description="Check vector store connection")
152def check_disk_space(threshold_bytes: int = 100 * 1024 * 1024) -> HealthCheck:
153 """Check available disk space exceeds threshold (default 100MB)."""
154 def _check() -> bool:
155 import shutil
156 usage = shutil.disk_usage("/")
157 return usage.free >= threshold_bytes
158 return HealthCheck(name="disk-space", check_fn=_check, timeout_seconds=1.0,
159 description=f"Free disk space >= {threshold_bytes/1024/1024:.0f}MB")
162def check_memory(threshold_bytes: int = 50 * 1024 * 1024) -> HealthCheck:
163 """Check available system memory exceeds threshold (default 50MB)."""
164 def _check() -> bool:
165 try:
166 with open("/proc/meminfo") as f:
167 for line in f:
168 if line.startswith("MemAvailable:"):
169 kb = int(line.split()[1])
170 return kb * 1024 >= threshold_bytes
171 except Exception:
172 return True # can't check, assume OK
173 return True
174 return HealthCheck(name="memory", check_fn=_check, timeout_seconds=1.0,
175 description=f"Available memory >= {threshold_bytes/1024/1024:.0f}MB")
178# ── Default health checker factory ────────────────────────────────────────────
181def create_default_health_checker() -> HealthChecker:
182 """Return a HealthChecker pre-loaded with sensible built-in checks."""
183 hc = HealthChecker()
184 hc.add_liveness(check_memory())
185 hc.add_readiness(check_disk_space())
186 return hc