Coverage for agentos/health/__init__.py: 32%

111 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""AgentOS health checks — readiness, liveness, and dependency probes. 

2 

3Provides standard health-check endpoints for Kubernetes, Docker, and load balancers. 

4""" 

5 

6from __future__ import annotations 

7 

8import time 

9from dataclasses import dataclass, field 

10from enum import Enum 

11from typing import Callable, Optional 

12 

13 

14class HealthStatus(Enum): 

15 

16 """健康状态枚举。""" 

17 

18 HEALTHY = "healthy" 

19 DEGRADED = "degraded" 

20 UNHEALTHY = "unhealthy" 

21 

22 

23@dataclass 

24class HealthCheck: 

25 """健康检查定义。""" 

26 name: str 

27 check_fn: Callable[[], bool] 

28 timeout_seconds: float = 5.0 

29 description: str = "" 

30 

31 

32@dataclass 

33class CheckResult: 

34 """检查结果。""" 

35 name: str 

36 status: HealthStatus 

37 latency_ms: float 

38 message: str = "" 

39 

40 def to_dict(self) -> dict: 

41 return { 

42 "name": self.name, 

43 "status": self.status.value, 

44 "latency_ms": round(self.latency_ms, 2), 

45 "message": self.message, 

46 } 

47 

48 

49class HealthChecker: 

50 """Aggregate readiness and liveness checks.""" 

51 

52 def __init__(self): 

53 self._readiness_checks: list[HealthCheck] = [] 

54 self._liveness_checks: list[HealthCheck] = [] 

55 

56 def add_readiness(self, check: HealthCheck): 

57 self._readiness_checks.append(check) 

58 

59 def add_liveness(self, check: HealthCheck): 

60 self._liveness_checks.append(check) 

61 

62 def _run_checks(self, checks: list[HealthCheck]) -> tuple[HealthStatus, list[CheckResult]]: 

63 results: list[CheckResult] = [] 

64 overall = HealthStatus.HEALTHY 

65 for chk in checks: 

66 start = time.monotonic() 

67 try: 

68 ok = chk.check_fn() 

69 except Exception as e: 

70 ok = False 

71 msg = str(e) 

72 else: 

73 msg = "ok" if ok else "check returned False" 

74 latency = (time.monotonic() - start) * 1000 

75 status = HealthStatus.HEALTHY if ok else HealthStatus.UNHEALTHY 

76 if status == HealthStatus.UNHEALTHY and overall != HealthStatus.UNHEALTHY: 

77 overall = HealthStatus.DEGRADED 

78 if status == HealthStatus.UNHEALTHY: 

79 overall = HealthStatus.UNHEALTHY 

80 results.append(CheckResult(name=chk.name, status=status, latency_ms=latency, message=msg)) 

81 return (overall, results) 

82 

83 def readiness(self) -> dict: 

84 """Run all readiness checks. Returns a dict suitable for a /health/ready endpoint.""" 

85 overall, results = self._run_checks(self._readiness_checks) 

86 return { 

87 "status": overall.value, 

88 "timestamp": time.time(), 

89 "checks": [r.to_dict() for r in results], 

90 } 

91 

92 def liveness(self) -> dict: 

93 """Run all liveness checks. Returns a dict suitable for a /health/live endpoint.""" 

94 overall, results = self._run_checks(self._liveness_checks) 

95 return { 

96 "status": overall.value, 

97 "timestamp": time.time(), 

98 "checks": [r.to_dict() for r in results], 

99 } 

100 

101 def all(self) -> dict: 

102 """Combined readiness + liveness report, suitable for /health.""" 

103 r = self.readiness() 

104 l = self.liveness() 

105 combined_status = HealthStatus.HEALTHY 

106 for s in (r["status"], l["status"]): 

107 if s == HealthStatus.UNHEALTHY.value: 

108 combined_status = HealthStatus.UNHEALTHY 

109 break 

110 if s == HealthStatus.DEGRADED.value: 

111 combined_status = HealthStatus.DEGRADED 

112 return { 

113 "status": combined_status.value, 

114 "timestamp": time.time(), 

115 "readiness": r, 

116 "liveness": l, 

117 } 

118 

119 

120# ── Built-in checks ─────────────────────────────────────────────────────────── 

121 

122 

123def check_openai_connectivity(api_key: Optional[str] = None) -> HealthCheck: 

124 """Verify connectivity to the OpenAI API.""" 

125 def _check() -> bool: 

126 try: 

127 import urllib.request 

128 req = urllib.request.Request("https://api.openai.com/v1/models", method="HEAD") 

129 if api_key: 

130 req.add_header("Authorization", f"Bearer {api_key}") 

131 urllib.request.urlopen(req, timeout=5) 

132 return True 

133 except Exception: 

134 return False 

135 return HealthCheck(name="openai-connectivity", check_fn=_check, timeout_seconds=5.0, 

136 description="Check OpenAI API reachability") 

137 

138 

139def check_vectorstore_health(db_instance=None) -> HealthCheck: 

140 """Check vector store connection health.""" 

141 def _check() -> bool: 

142 if db_instance is None: 

143 return False 

144 try: 

145 return hasattr(db_instance, "is_healthy") and db_instance.is_healthy() 

146 except Exception: 

147 return False 

148 return HealthCheck(name="vectorstore-health", check_fn=_check, timeout_seconds=5.0, 

149 description="Check vector store connection") 

150 

151 

152def check_disk_space(threshold_bytes: int = 100 * 1024 * 1024) -> HealthCheck: 

153 """Check available disk space exceeds threshold (default 100MB).""" 

154 def _check() -> bool: 

155 import shutil 

156 usage = shutil.disk_usage("/") 

157 return usage.free >= threshold_bytes 

158 return HealthCheck(name="disk-space", check_fn=_check, timeout_seconds=1.0, 

159 description=f"Free disk space >= {threshold_bytes/1024/1024:.0f}MB") 

160 

161 

162def check_memory(threshold_bytes: int = 50 * 1024 * 1024) -> HealthCheck: 

163 """Check available system memory exceeds threshold (default 50MB).""" 

164 def _check() -> bool: 

165 try: 

166 with open("/proc/meminfo") as f: 

167 for line in f: 

168 if line.startswith("MemAvailable:"): 

169 kb = int(line.split()[1]) 

170 return kb * 1024 >= threshold_bytes 

171 except Exception: 

172 return True # can't check, assume OK 

173 return True 

174 return HealthCheck(name="memory", check_fn=_check, timeout_seconds=1.0, 

175 description=f"Available memory >= {threshold_bytes/1024/1024:.0f}MB") 

176 

177 

178# ── Default health checker factory ──────────────────────────────────────────── 

179 

180 

181def create_default_health_checker() -> HealthChecker: 

182 """Return a HealthChecker pre-loaded with sensible built-in checks.""" 

183 hc = HealthChecker() 

184 hc.add_liveness(check_memory()) 

185 hc.add_readiness(check_disk_space()) 

186 return hc