Coverage for agentos/observability/metrics.py: 48%
170 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.70 — 性能指标与可观测性增强。
3基因来源: Prometheus metrics + OpenTelemetry
5提供:
6- 延迟分位数 (p50/p95/p99)
7- 吞吐量统计 (RPS)
8- 错误率追踪
9- 缓存命中率
10- TTL-based环形缓冲区
11"""
13from __future__ import annotations
15import threading
16import time
17from collections import deque
18from dataclasses import dataclass, field
19from typing import Any
22@dataclass
23class MetricSnapshot:
24 """指标快照 — 用于导出/序列化。"""
25 timestamp: float = field(default_factory=time.time)
26 histograms: dict[str, dict] = field(default_factory=dict)
27 counters: dict[str, int] = field(default_factory=dict)
28 gauges: dict[str, float] = field(default_factory=dict)
29 derived_metrics: dict[str, float] = field(default_factory=dict)
31 def to_json(self) -> str:
32 import json
33 return json.dumps({
34 "ts": self.timestamp,
35 "h": self.histograms,
36 "c": self.counters,
37 "g": self.gauges,
38 "d": self.derived_metrics,
39 })
41 @classmethod
42 def from_collector(cls, collector: "MetricsCollector") -> "MetricSnapshot":
43 s = collector.snapshot()
44 return cls(
45 histograms={"step": s["latency_step_ms"], "model": s["latency_model_ms"], "tool": s["latency_tool_ms"]},
46 counters={"steps": collector.steps_total.value, "model_calls": collector.model_calls_total.value,
47 "tool_calls": collector.tool_calls_total.value, "errors": collector.errors_total.value,
48 "cache_hits": collector.cache_hits.value, "cache_misses": collector.cache_misses.value},
49 gauges={"active_agents": collector.active_agents.value, "queue_depth": collector.queue_depth.value},
50 derived_metrics={"rps": s["throughput"]["rps"], "error_rate": s["error_rate"], "cache_hit_rate": s["cache_hit_rate"]},
51 )
54@dataclass
55class MetricPoint:
56 """指标数据点。"""
57 timestamp: float
58 value: float
59 labels: dict[str, str] = field(default_factory=dict)
62@dataclass
63class Histogram:
64 """滑动窗口直方图 — 计算分位数。"""
65 name: str
66 window_seconds: float = 300.0
67 max_size: int = 10000
68 _points: deque = field(default_factory=deque)
69 _lock: threading.Lock = field(default_factory=threading.Lock)
71 def observe(self, value: float, **labels):
72 with self._lock:
73 self._points.append(MetricPoint(timestamp=time.time(), value=value, labels=labels))
74 self._prune()
75 if len(self._points) > self.max_size:
76 self._points.popleft()
78 def _prune(self):
79 cutoff = time.time() - self.window_seconds
80 while self._points and self._points[0].timestamp < cutoff:
81 self._points.popleft()
83 @property
84 def count(self) -> int:
85 with self._lock:
86 self._prune()
87 return len(self._points)
89 def quantile(self, q: float) -> float:
90 """计算分位数 0.5=p50, 0.95=p95, 0.99=p99。"""
91 with self._lock:
92 self._prune()
93 if not self._points:
94 return 0.0
95 values = sorted(p.value for p in self._points)
96 idx = int(len(values) * q)
97 if idx >= len(values):
98 idx = len(values) - 1
99 return values[idx]
101 @property
102 def p50(self) -> float:
103 return self.quantile(0.5)
105 @property
106 def p95(self) -> float:
107 return self.quantile(0.95)
109 @property
110 def p99(self) -> float:
111 return self.quantile(0.99)
113 @property
114 def avg(self) -> float:
115 with self._lock:
116 self._prune()
117 if not self._points:
118 return 0.0
119 return sum(p.value for p in self._points) / len(self._points)
121 @property
122 def min_val(self) -> float:
123 with self._lock:
124 self._prune()
125 if not self._points:
126 return 0.0
127 return min(p.value for p in self._points)
129 @property
130 def max_val(self) -> float:
131 with self._lock:
132 self._prune()
133 if not self._points:
134 return 0.0
135 return max(p.value for p in self._points)
137 def stats(self) -> dict:
138 return {
139 "name": self.name,
140 "count": self.count,
141 "avg": self.avg,
142 "p50": self.p50,
143 "p95": self.p95,
144 "p99": self.p99,
145 "min": self.min_val,
146 "max": self.max_val,
147 "window_seconds": self.window_seconds,
148 }
151@dataclass
152class Counter:
153 """单调递增计数器。"""
154 name: str
155 _value: int = 0
156 _labels: dict[str, str] = field(default_factory=dict)
158 def inc(self, amount: int = 1):
159 self._value += amount
161 @property
162 def value(self) -> int:
163 return self._value
166@dataclass
167class Gauge:
168 """可增可减的仪表值。"""
169 name: str
170 _value: float = 0.0
171 _labels: dict[str, str] = field(default_factory=dict)
173 def set(self, value: float):
174 self._value = value
176 def inc(self, amount: float = 1.0):
177 self._value += amount
179 def dec(self, amount: float = 1.0):
180 self._value -= amount
182 @property
183 def value(self) -> float:
184 return self._value
187class MetricsCollector:
188 """
189 统一指标收集器。
190 内置: latency, throughput, error_rate, cache_hit_rate。
191 """
193 def __init__(self, window_seconds: float = 300.0):
194 self.window_seconds = window_seconds
196 # Histograms
197 self.latency_step = Histogram("step_latency", window_seconds)
198 self.latency_model = Histogram("model_latency", window_seconds)
199 self.latency_tool = Histogram("tool_latency", window_seconds)
201 # Counters
202 self.steps_total = Counter("steps_total")
203 self.model_calls_total = Counter("model_calls_total")
204 self.tool_calls_total = Counter("tool_calls_total")
205 self.errors_total = Counter("errors_total")
206 self.cache_hits = Counter("cache_hits")
207 self.cache_misses = Counter("cache_misses")
209 # Gauges
210 self.active_agents = Gauge("active_agents")
211 self.queue_depth = Gauge("queue_depth")
212 self.memory_used_mb = Gauge("memory_used_mb")
214 self._start_time = time.time()
216 # ── Recording ────────────────────────────────
218 def record_step_latency(self, duration_ms: float):
219 self.latency_step.observe(duration_ms)
220 self.steps_total.inc()
222 def record_model_latency(self, duration_ms: float, model: str = ""):
223 self.latency_model.observe(duration_ms, model=model)
224 self.model_calls_total.inc()
226 def record_tool_latency(self, duration_ms: float, tool: str = ""):
227 self.latency_tool.observe(duration_ms, tool=tool)
228 self.tool_calls_total.inc()
230 def record_error(self):
231 self.errors_total.inc()
233 def record_cache_hit(self):
234 self.cache_hits.inc()
236 def record_cache_miss(self):
237 self.cache_misses.inc()
239 # ── Derived Metrics ──────────────────────────
241 @property
242 def uptime_seconds(self) -> float:
243 return time.time() - self._start_time
245 @property
246 def rps(self) -> float:
247 """请求速率 (steps/sec over window)。"""
248 if self.uptime_seconds < 1:
249 return self.steps_total.value
250 return self.steps_total.value / self.uptime_seconds
252 @property
253 def error_rate(self) -> float:
254 total = self.steps_total.value + self.errors_total.value
255 if total == 0:
256 return 0.0
257 return self.errors_total.value / total
259 @property
260 def cache_hit_rate(self) -> float:
261 total = self.cache_hits.value + self.cache_misses.value
262 if total == 0:
263 return 0.0
264 return self.cache_hits.value / total
266 # ── Snapshot ─────────────────────────────────
268 def snapshot(self) -> dict:
269 return {
270 "uptime_seconds": self.uptime_seconds,
271 "throughput": {
272 "rps": round(self.rps, 2),
273 "steps_total": self.steps_total.value,
274 "model_calls": self.model_calls_total.value,
275 "tool_calls": self.tool_calls_total.value,
276 },
277 "latency_step_ms": self.latency_step.stats(),
278 "latency_model_ms": self.latency_model.stats(),
279 "latency_tool_ms": self.latency_tool.stats(),
280 "error_rate": round(self.error_rate, 4),
281 "errors_total": self.errors_total.value,
282 "cache_hit_rate": round(self.cache_hit_rate, 2),
283 "cache_hits": self.cache_hits.value,
284 "cache_misses": self.cache_misses.value,
285 "active_agents": self.active_agents.value,
286 "queue_depth": self.queue_depth.value,
287 }
289 def summary(self) -> str:
290 s = self.snapshot()
291 lines = [
292 f"运行时间: {s['uptime_seconds']:.0f}s",
293 f"吞吐: {s['throughput']['rps']} rps ({s['throughput']['steps_total']} steps)",
294 f"延迟: p50={s['latency_step_ms']['p50']:.0f}ms p95={s['latency_step_ms']['p95']:.0f}ms p99={s['latency_step_ms']['p99']:.0f}ms",
295 f"错误率: {s['error_rate']:.2%} ({s['errors_total']} errors)",
296 f"缓存命中率: {s['cache_hit_rate']:.1%} ({s['cache_hits']}/{s['cache_hits'] + s['cache_misses']})",
297 ]
298 return "\n".join(lines)