Coverage for agentos/observability/metrics.py: 48%

170 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.70 — 性能指标与可观测性增强。 

3基因来源: Prometheus metrics + OpenTelemetry 

4 

5提供: 

6- 延迟分位数 (p50/p95/p99) 

7- 吞吐量统计 (RPS) 

8- 错误率追踪 

9- 缓存命中率 

10- TTL-based环形缓冲区 

11""" 

12 

13from __future__ import annotations 

14 

15import threading 

16import time 

17from collections import deque 

18from dataclasses import dataclass, field 

19from typing import Any 

20 

21 

22@dataclass 

23class MetricSnapshot: 

24 """指标快照 — 用于导出/序列化。""" 

25 timestamp: float = field(default_factory=time.time) 

26 histograms: dict[str, dict] = field(default_factory=dict) 

27 counters: dict[str, int] = field(default_factory=dict) 

28 gauges: dict[str, float] = field(default_factory=dict) 

29 derived_metrics: dict[str, float] = field(default_factory=dict) 

30 

31 def to_json(self) -> str: 

32 import json 

33 return json.dumps({ 

34 "ts": self.timestamp, 

35 "h": self.histograms, 

36 "c": self.counters, 

37 "g": self.gauges, 

38 "d": self.derived_metrics, 

39 }) 

40 

41 @classmethod 

42 def from_collector(cls, collector: "MetricsCollector") -> "MetricSnapshot": 

43 s = collector.snapshot() 

44 return cls( 

45 histograms={"step": s["latency_step_ms"], "model": s["latency_model_ms"], "tool": s["latency_tool_ms"]}, 

46 counters={"steps": collector.steps_total.value, "model_calls": collector.model_calls_total.value, 

47 "tool_calls": collector.tool_calls_total.value, "errors": collector.errors_total.value, 

48 "cache_hits": collector.cache_hits.value, "cache_misses": collector.cache_misses.value}, 

49 gauges={"active_agents": collector.active_agents.value, "queue_depth": collector.queue_depth.value}, 

50 derived_metrics={"rps": s["throughput"]["rps"], "error_rate": s["error_rate"], "cache_hit_rate": s["cache_hit_rate"]}, 

51 ) 

52 

53 

54@dataclass 

55class MetricPoint: 

56 """指标数据点。""" 

57 timestamp: float 

58 value: float 

59 labels: dict[str, str] = field(default_factory=dict) 

60 

61 

62@dataclass 

63class Histogram: 

64 """滑动窗口直方图 — 计算分位数。""" 

65 name: str 

66 window_seconds: float = 300.0 

67 max_size: int = 10000 

68 _points: deque = field(default_factory=deque) 

69 _lock: threading.Lock = field(default_factory=threading.Lock) 

70 

71 def observe(self, value: float, **labels): 

72 with self._lock: 

73 self._points.append(MetricPoint(timestamp=time.time(), value=value, labels=labels)) 

74 self._prune() 

75 if len(self._points) > self.max_size: 

76 self._points.popleft() 

77 

78 def _prune(self): 

79 cutoff = time.time() - self.window_seconds 

80 while self._points and self._points[0].timestamp < cutoff: 

81 self._points.popleft() 

82 

83 @property 

84 def count(self) -> int: 

85 with self._lock: 

86 self._prune() 

87 return len(self._points) 

88 

89 def quantile(self, q: float) -> float: 

90 """计算分位数 0.5=p50, 0.95=p95, 0.99=p99。""" 

91 with self._lock: 

92 self._prune() 

93 if not self._points: 

94 return 0.0 

95 values = sorted(p.value for p in self._points) 

96 idx = int(len(values) * q) 

97 if idx >= len(values): 

98 idx = len(values) - 1 

99 return values[idx] 

100 

101 @property 

102 def p50(self) -> float: 

103 return self.quantile(0.5) 

104 

105 @property 

106 def p95(self) -> float: 

107 return self.quantile(0.95) 

108 

109 @property 

110 def p99(self) -> float: 

111 return self.quantile(0.99) 

112 

113 @property 

114 def avg(self) -> float: 

115 with self._lock: 

116 self._prune() 

117 if not self._points: 

118 return 0.0 

119 return sum(p.value for p in self._points) / len(self._points) 

120 

121 @property 

122 def min_val(self) -> float: 

123 with self._lock: 

124 self._prune() 

125 if not self._points: 

126 return 0.0 

127 return min(p.value for p in self._points) 

128 

129 @property 

130 def max_val(self) -> float: 

131 with self._lock: 

132 self._prune() 

133 if not self._points: 

134 return 0.0 

135 return max(p.value for p in self._points) 

136 

137 def stats(self) -> dict: 

138 return { 

139 "name": self.name, 

140 "count": self.count, 

141 "avg": self.avg, 

142 "p50": self.p50, 

143 "p95": self.p95, 

144 "p99": self.p99, 

145 "min": self.min_val, 

146 "max": self.max_val, 

147 "window_seconds": self.window_seconds, 

148 } 

149 

150 

151@dataclass 

152class Counter: 

153 """单调递增计数器。""" 

154 name: str 

155 _value: int = 0 

156 _labels: dict[str, str] = field(default_factory=dict) 

157 

158 def inc(self, amount: int = 1): 

159 self._value += amount 

160 

161 @property 

162 def value(self) -> int: 

163 return self._value 

164 

165 

166@dataclass 

167class Gauge: 

168 """可增可减的仪表值。""" 

169 name: str 

170 _value: float = 0.0 

171 _labels: dict[str, str] = field(default_factory=dict) 

172 

173 def set(self, value: float): 

174 self._value = value 

175 

176 def inc(self, amount: float = 1.0): 

177 self._value += amount 

178 

179 def dec(self, amount: float = 1.0): 

180 self._value -= amount 

181 

182 @property 

183 def value(self) -> float: 

184 return self._value 

185 

186 

187class MetricsCollector: 

188 """ 

189 统一指标收集器。 

190 内置: latency, throughput, error_rate, cache_hit_rate。 

191 """ 

192 

193 def __init__(self, window_seconds: float = 300.0): 

194 self.window_seconds = window_seconds 

195 

196 # Histograms 

197 self.latency_step = Histogram("step_latency", window_seconds) 

198 self.latency_model = Histogram("model_latency", window_seconds) 

199 self.latency_tool = Histogram("tool_latency", window_seconds) 

200 

201 # Counters 

202 self.steps_total = Counter("steps_total") 

203 self.model_calls_total = Counter("model_calls_total") 

204 self.tool_calls_total = Counter("tool_calls_total") 

205 self.errors_total = Counter("errors_total") 

206 self.cache_hits = Counter("cache_hits") 

207 self.cache_misses = Counter("cache_misses") 

208 

209 # Gauges 

210 self.active_agents = Gauge("active_agents") 

211 self.queue_depth = Gauge("queue_depth") 

212 self.memory_used_mb = Gauge("memory_used_mb") 

213 

214 self._start_time = time.time() 

215 

216 # ── Recording ──────────────────────────────── 

217 

218 def record_step_latency(self, duration_ms: float): 

219 self.latency_step.observe(duration_ms) 

220 self.steps_total.inc() 

221 

222 def record_model_latency(self, duration_ms: float, model: str = ""): 

223 self.latency_model.observe(duration_ms, model=model) 

224 self.model_calls_total.inc() 

225 

226 def record_tool_latency(self, duration_ms: float, tool: str = ""): 

227 self.latency_tool.observe(duration_ms, tool=tool) 

228 self.tool_calls_total.inc() 

229 

230 def record_error(self): 

231 self.errors_total.inc() 

232 

233 def record_cache_hit(self): 

234 self.cache_hits.inc() 

235 

236 def record_cache_miss(self): 

237 self.cache_misses.inc() 

238 

239 # ── Derived Metrics ────────────────────────── 

240 

241 @property 

242 def uptime_seconds(self) -> float: 

243 return time.time() - self._start_time 

244 

245 @property 

246 def rps(self) -> float: 

247 """请求速率 (steps/sec over window)。""" 

248 if self.uptime_seconds < 1: 

249 return self.steps_total.value 

250 return self.steps_total.value / self.uptime_seconds 

251 

252 @property 

253 def error_rate(self) -> float: 

254 total = self.steps_total.value + self.errors_total.value 

255 if total == 0: 

256 return 0.0 

257 return self.errors_total.value / total 

258 

259 @property 

260 def cache_hit_rate(self) -> float: 

261 total = self.cache_hits.value + self.cache_misses.value 

262 if total == 0: 

263 return 0.0 

264 return self.cache_hits.value / total 

265 

266 # ── Snapshot ───────────────────────────────── 

267 

268 def snapshot(self) -> dict: 

269 return { 

270 "uptime_seconds": self.uptime_seconds, 

271 "throughput": { 

272 "rps": round(self.rps, 2), 

273 "steps_total": self.steps_total.value, 

274 "model_calls": self.model_calls_total.value, 

275 "tool_calls": self.tool_calls_total.value, 

276 }, 

277 "latency_step_ms": self.latency_step.stats(), 

278 "latency_model_ms": self.latency_model.stats(), 

279 "latency_tool_ms": self.latency_tool.stats(), 

280 "error_rate": round(self.error_rate, 4), 

281 "errors_total": self.errors_total.value, 

282 "cache_hit_rate": round(self.cache_hit_rate, 2), 

283 "cache_hits": self.cache_hits.value, 

284 "cache_misses": self.cache_misses.value, 

285 "active_agents": self.active_agents.value, 

286 "queue_depth": self.queue_depth.value, 

287 } 

288 

289 def summary(self) -> str: 

290 s = self.snapshot() 

291 lines = [ 

292 f"运行时间: {s['uptime_seconds']:.0f}s", 

293 f"吞吐: {s['throughput']['rps']} rps ({s['throughput']['steps_total']} steps)", 

294 f"延迟: p50={s['latency_step_ms']['p50']:.0f}ms p95={s['latency_step_ms']['p95']:.0f}ms p99={s['latency_step_ms']['p99']:.0f}ms", 

295 f"错误率: {s['error_rate']:.2%} ({s['errors_total']} errors)", 

296 f"缓存命中率: {s['cache_hit_rate']:.1%} ({s['cache_hits']}/{s['cache_hits'] + s['cache_misses']})", 

297 ] 

298 return "\n".join(lines)