Coverage for agentos/tools/metrics.py: 97%
199 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:17 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:17 +0800
1"""
2Metrics Collector for AgentOS.
4Lightweight in-process metrics with Counter, Gauge, Histogram, and Timer.
5Thread-safe, zero external dependencies, Prometheus-style text exposition.
6"""
8import threading
9import time
10from collections import defaultdict
11from dataclasses import dataclass, field
12from typing import Any, Callable, Dict, List, Optional, Set, Union
15# ============================================================================
16# Histogram buckets (pre-defined for common latency ranges)
17# ============================================================================
19DEFAULT_BUCKETS = (
20 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
21)
22LARGE_BUCKETS = (
23 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1800.0, 3600.0,
24)
27# ============================================================================
28# Metric types
29# ============================================================================
31class Counter:
32 """Monotonically increasing counter. Thread-safe."""
34 def __init__(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None):
35 self.name = name
36 self.help = help_text
37 self.labels = labels or {}
38 self._value: float = 0.0
39 self._lock = threading.Lock()
41 def inc(self, delta: float = 1.0) -> None:
42 with self._lock:
43 self._value += delta
45 def get(self) -> float:
46 with self._lock:
47 return self._value
49 def _format_labels(self) -> str:
50 if not self.labels:
51 return ""
52 return "{" + ",".join(f'{k}="{v}"' for k, v in self.labels.items()) + "}"
55class Gauge:
56 """Value that can go up and down. Thread-safe."""
58 def __init__(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None):
59 self.name = name
60 self.help = help_text
61 self.labels = labels or {}
62 self._value: float = 0.0
63 self._lock = threading.Lock()
65 def set(self, value: float) -> None:
66 with self._lock:
67 self._value = value
69 def inc(self, delta: float = 1.0) -> None:
70 with self._lock:
71 self._value += delta
73 def dec(self, delta: float = 1.0) -> None:
74 with self._lock:
75 self._value -= delta
77 def get(self) -> float:
78 with self._lock:
79 return self._value
81 def _format_labels(self) -> str:
82 if not self.labels:
83 return ""
84 return "{" + ",".join(f'{k}="{v}"' for k, v in self.labels.items()) + "}"
87class Histogram:
88 """Bucketed histogram with sum and count. Thread-safe."""
90 def __init__(
91 self,
92 name: str,
93 help_text: str = "",
94 labels: Optional[Dict[str, str]] = None,
95 buckets: tuple = DEFAULT_BUCKETS,
96 ):
97 self.name = name
98 self.help = help_text
99 self.labels = labels or {}
100 self.buckets = tuple(sorted(buckets))
101 self._lock = threading.Lock()
102 self._bucket_counts: List[int] = [0] * (len(self.buckets) + 1) # +1 for +Inf
103 self._sum: float = 0.0
104 self._count: int = 0
106 def observe(self, value: float) -> None:
107 with self._lock:
108 self._sum += value
109 self._count += 1
110 for i, bound in enumerate(self.buckets):
111 if value <= bound:
112 self._bucket_counts[i] += 1
113 return
114 self._bucket_counts[-1] += 1 # +Inf bucket
116 def get(self) -> Dict[str, Any]:
117 with self._lock:
118 return {
119 "sum": self._sum,
120 "count": self._count,
121 "buckets": dict(zip(self.buckets + ("+Inf",), self._bucket_counts)),
122 }
124 def p50(self) -> float:
125 return self._percentile(0.50)
127 def p90(self) -> float:
128 return self._percentile(0.90)
130 def p99(self) -> float:
131 return self._percentile(0.99)
133 def _percentile(self, p: float) -> float:
134 with self._lock:
135 if self._count == 0:
136 return 0.0
137 target = int(self._count * p)
138 accumulated = 0
139 for i, bc in enumerate(self._bucket_counts):
140 accumulated += bc
141 if accumulated >= target:
142 if i < len(self.buckets):
143 return self.buckets[i]
144 return self.buckets[-1] if self.buckets else 0.0
145 return self.buckets[-1] if self.buckets else 0.0
147 def _format_labels(self) -> str:
148 if not self.labels:
149 return ""
150 return "{" + ",".join(f'{k}="{v}"' for k, v in self.labels.items()) + "}"
153class Timer:
154 """Convenience wrapper: Histogram for timing. Also tracks rate via internal counter."""
156 def __init__(
157 self,
158 name: str,
159 help_text: str = "",
160 labels: Optional[Dict[str, str]] = None,
161 buckets: tuple = DEFAULT_BUCKETS,
162 ):
163 self.name = name
164 self.help = help_text
165 self.histogram = Histogram(name, help_text, labels, buckets)
166 self._call_count = Counter(name + "_total", help_text, labels)
168 def time(self, fn: Callable[..., Any], *args, **kwargs) -> Any:
169 start = time.perf_counter()
170 try:
171 return fn(*args, **kwargs)
172 finally:
173 elapsed = time.perf_counter() - start
174 self.histogram.observe(elapsed)
175 self._call_count.inc()
177 def __enter__(self):
178 self._start = time.perf_counter()
179 return self
181 def __exit__(self, *args):
182 elapsed = time.perf_counter() - self._start
183 self.histogram.observe(elapsed)
184 self._call_count.inc()
186 def get(self) -> Dict[str, Any]:
187 return {
188 "histogram": self.histogram.get(),
189 "count": self._call_count.get(),
190 }
193# ============================================================================
194# MetricsCollector (Registry)
195# ============================================================================
197class MetricsCollector:
198 """Global registry for metrics. Provides Prometheus text format exposition."""
200 def __init__(self, namespace: str = ""):
201 self.namespace = namespace
202 self._metrics: Dict[str, Any] = {}
203 self._lock = threading.Lock()
205 def _full_name(self, name: str) -> str:
206 if self.namespace:
207 return f"{self.namespace}_{name}"
208 return name
210 def counter(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None) -> Counter:
211 full = self._full_name(name)
212 with self._lock:
213 if full not in self._metrics:
214 self._metrics[full] = Counter(full, help_text, labels)
215 return self._metrics[full]
217 def gauge(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None) -> Gauge:
218 full = self._full_name(name)
219 with self._lock:
220 if full not in self._metrics:
221 self._metrics[full] = Gauge(full, help_text, labels)
222 return self._metrics[full]
224 def histogram(
225 self,
226 name: str,
227 help_text: str = "",
228 labels: Optional[Dict[str, str]] = None,
229 buckets: tuple = DEFAULT_BUCKETS,
230 ) -> Histogram:
231 full = self._full_name(name)
232 with self._lock:
233 if full not in self._metrics:
234 self._metrics[full] = Histogram(full, help_text, labels, buckets)
235 return self._metrics[full]
237 def timer(
238 self,
239 name: str,
240 help_text: str = "",
241 labels: Optional[Dict[str, str]] = None,
242 buckets: tuple = DEFAULT_BUCKETS,
243 ) -> Timer:
244 full = self._full_name(name)
245 with self._lock:
246 if full not in self._metrics:
247 self._metrics[full] = Timer(full, help_text, labels, buckets)
248 return self._metrics[full]
250 def get(self, name: str) -> Optional[Any]:
251 return self._metrics.get(self._full_name(name))
253 def list_metrics(self) -> List[str]:
254 with self._lock:
255 return list(self._metrics.keys())
257 def get_all(self) -> Dict[str, Any]:
258 """Return raw values for all metrics (for programmatic use)."""
259 result = {}
260 with self._lock:
261 for name, metric in self._metrics.items():
262 if hasattr(metric, "get"):
263 result[name] = metric.get()
264 return result
266 def to_prometheus(self) -> str:
267 """Export all metrics in Prometheus text format."""
268 lines = []
269 with self._lock:
270 for name, metric in self._metrics.items():
271 if metric.help:
272 lines.append(f"# HELP {name} {metric.help}")
273 lines.append(f"# TYPE {name} histogram" if isinstance(metric, (Histogram, Timer)) else
274 f"# TYPE {name} gauge" if isinstance(metric, Gauge) else
275 f"# TYPE {name} counter")
277 if isinstance(metric, Counter):
278 lbl = metric._format_labels()
279 lines.append(f"{name}{lbl} {metric.get()}")
280 elif isinstance(metric, Gauge):
281 lbl = metric._format_labels()
282 lines.append(f"{name}{lbl} {metric.get()}")
283 elif isinstance(metric, Histogram):
284 lbl = metric._format_labels()
285 data = metric.get()
286 lines.append(f"{name}_count{lbl} {data['count']}")
287 lines.append(f"{name}_sum{lbl} {data['sum']}")
288 for bucket_name in metric.buckets + ("+Inf",):
289 bval = data["buckets"].get(bucket_name, 0)
290 # Prometheus: bucket labels use 'le' key
291 le_label = '{le="' + str(bucket_name) + '"}'
292 lines.append(f"{name}_bucket{lbl}{le_label} {bval}")
293 elif isinstance(metric, Timer):
294 lbl = metric.histogram._format_labels()
295 hdata = metric.histogram.get()
296 lines.append(f"{name}_count{lbl} {hdata['count']}")
297 lines.append(f"{name}_sum{lbl} {hdata['sum']}")
298 for bucket_name in metric.histogram.buckets + ("+Inf",):
299 bval = hdata["buckets"].get(bucket_name, 0)
300 le_label = '{le="' + str(bucket_name) + '"}'
301 lines.append(f"{name}_bucket{lbl}{le_label} {bval}")
303 return "\n".join(lines) + "\n"
306# ============================================================================
307# Global singleton
308# ============================================================================
310_default_collector: Optional[MetricsCollector] = None
311_collector_lock = threading.Lock()
314def get_metrics_collector(namespace: str = "") -> MetricsCollector:
315 global _default_collector
316 if _default_collector is None:
317 with _collector_lock:
318 if _default_collector is None:
319 _default_collector = MetricsCollector(namespace=namespace or "agentos")
320 return _default_collector