Coverage for agentos/tools/metrics.py: 97%

199 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 07:17 +0800

1""" 

2Metrics Collector for AgentOS. 

3 

4Lightweight in-process metrics with Counter, Gauge, Histogram, and Timer. 

5Thread-safe, zero external dependencies, Prometheus-style text exposition. 

6""" 

7 

8import threading 

9import time 

10from collections import defaultdict 

11from dataclasses import dataclass, field 

12from typing import Any, Callable, Dict, List, Optional, Set, Union 

13 

14 

15# ============================================================================ 

16# Histogram buckets (pre-defined for common latency ranges) 

17# ============================================================================ 

18 

19DEFAULT_BUCKETS = ( 

20 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 

21) 

22LARGE_BUCKETS = ( 

23 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1800.0, 3600.0, 

24) 

25 

26 

27# ============================================================================ 

28# Metric types 

29# ============================================================================ 

30 

31class Counter: 

32 """Monotonically increasing counter. Thread-safe.""" 

33 

34 def __init__(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None): 

35 self.name = name 

36 self.help = help_text 

37 self.labels = labels or {} 

38 self._value: float = 0.0 

39 self._lock = threading.Lock() 

40 

41 def inc(self, delta: float = 1.0) -> None: 

42 with self._lock: 

43 self._value += delta 

44 

45 def get(self) -> float: 

46 with self._lock: 

47 return self._value 

48 

49 def _format_labels(self) -> str: 

50 if not self.labels: 

51 return "" 

52 return "{" + ",".join(f'{k}="{v}"' for k, v in self.labels.items()) + "}" 

53 

54 

55class Gauge: 

56 """Value that can go up and down. Thread-safe.""" 

57 

58 def __init__(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None): 

59 self.name = name 

60 self.help = help_text 

61 self.labels = labels or {} 

62 self._value: float = 0.0 

63 self._lock = threading.Lock() 

64 

65 def set(self, value: float) -> None: 

66 with self._lock: 

67 self._value = value 

68 

69 def inc(self, delta: float = 1.0) -> None: 

70 with self._lock: 

71 self._value += delta 

72 

73 def dec(self, delta: float = 1.0) -> None: 

74 with self._lock: 

75 self._value -= delta 

76 

77 def get(self) -> float: 

78 with self._lock: 

79 return self._value 

80 

81 def _format_labels(self) -> str: 

82 if not self.labels: 

83 return "" 

84 return "{" + ",".join(f'{k}="{v}"' for k, v in self.labels.items()) + "}" 

85 

86 

87class Histogram: 

88 """Bucketed histogram with sum and count. Thread-safe.""" 

89 

90 def __init__( 

91 self, 

92 name: str, 

93 help_text: str = "", 

94 labels: Optional[Dict[str, str]] = None, 

95 buckets: tuple = DEFAULT_BUCKETS, 

96 ): 

97 self.name = name 

98 self.help = help_text 

99 self.labels = labels or {} 

100 self.buckets = tuple(sorted(buckets)) 

101 self._lock = threading.Lock() 

102 self._bucket_counts: List[int] = [0] * (len(self.buckets) + 1) # +1 for +Inf 

103 self._sum: float = 0.0 

104 self._count: int = 0 

105 

106 def observe(self, value: float) -> None: 

107 with self._lock: 

108 self._sum += value 

109 self._count += 1 

110 for i, bound in enumerate(self.buckets): 

111 if value <= bound: 

112 self._bucket_counts[i] += 1 

113 return 

114 self._bucket_counts[-1] += 1 # +Inf bucket 

115 

116 def get(self) -> Dict[str, Any]: 

117 with self._lock: 

118 return { 

119 "sum": self._sum, 

120 "count": self._count, 

121 "buckets": dict(zip(self.buckets + ("+Inf",), self._bucket_counts)), 

122 } 

123 

124 def p50(self) -> float: 

125 return self._percentile(0.50) 

126 

127 def p90(self) -> float: 

128 return self._percentile(0.90) 

129 

130 def p99(self) -> float: 

131 return self._percentile(0.99) 

132 

133 def _percentile(self, p: float) -> float: 

134 with self._lock: 

135 if self._count == 0: 

136 return 0.0 

137 target = int(self._count * p) 

138 accumulated = 0 

139 for i, bc in enumerate(self._bucket_counts): 

140 accumulated += bc 

141 if accumulated >= target: 

142 if i < len(self.buckets): 

143 return self.buckets[i] 

144 return self.buckets[-1] if self.buckets else 0.0 

145 return self.buckets[-1] if self.buckets else 0.0 

146 

147 def _format_labels(self) -> str: 

148 if not self.labels: 

149 return "" 

150 return "{" + ",".join(f'{k}="{v}"' for k, v in self.labels.items()) + "}" 

151 

152 

153class Timer: 

154 """Convenience wrapper: Histogram for timing. Also tracks rate via internal counter.""" 

155 

156 def __init__( 

157 self, 

158 name: str, 

159 help_text: str = "", 

160 labels: Optional[Dict[str, str]] = None, 

161 buckets: tuple = DEFAULT_BUCKETS, 

162 ): 

163 self.name = name 

164 self.help = help_text 

165 self.histogram = Histogram(name, help_text, labels, buckets) 

166 self._call_count = Counter(name + "_total", help_text, labels) 

167 

168 def time(self, fn: Callable[..., Any], *args, **kwargs) -> Any: 

169 start = time.perf_counter() 

170 try: 

171 return fn(*args, **kwargs) 

172 finally: 

173 elapsed = time.perf_counter() - start 

174 self.histogram.observe(elapsed) 

175 self._call_count.inc() 

176 

177 def __enter__(self): 

178 self._start = time.perf_counter() 

179 return self 

180 

181 def __exit__(self, *args): 

182 elapsed = time.perf_counter() - self._start 

183 self.histogram.observe(elapsed) 

184 self._call_count.inc() 

185 

186 def get(self) -> Dict[str, Any]: 

187 return { 

188 "histogram": self.histogram.get(), 

189 "count": self._call_count.get(), 

190 } 

191 

192 

193# ============================================================================ 

194# MetricsCollector (Registry) 

195# ============================================================================ 

196 

197class MetricsCollector: 

198 """Global registry for metrics. Provides Prometheus text format exposition.""" 

199 

200 def __init__(self, namespace: str = ""): 

201 self.namespace = namespace 

202 self._metrics: Dict[str, Any] = {} 

203 self._lock = threading.Lock() 

204 

205 def _full_name(self, name: str) -> str: 

206 if self.namespace: 

207 return f"{self.namespace}_{name}" 

208 return name 

209 

210 def counter(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None) -> Counter: 

211 full = self._full_name(name) 

212 with self._lock: 

213 if full not in self._metrics: 

214 self._metrics[full] = Counter(full, help_text, labels) 

215 return self._metrics[full] 

216 

217 def gauge(self, name: str, help_text: str = "", labels: Optional[Dict[str, str]] = None) -> Gauge: 

218 full = self._full_name(name) 

219 with self._lock: 

220 if full not in self._metrics: 

221 self._metrics[full] = Gauge(full, help_text, labels) 

222 return self._metrics[full] 

223 

224 def histogram( 

225 self, 

226 name: str, 

227 help_text: str = "", 

228 labels: Optional[Dict[str, str]] = None, 

229 buckets: tuple = DEFAULT_BUCKETS, 

230 ) -> Histogram: 

231 full = self._full_name(name) 

232 with self._lock: 

233 if full not in self._metrics: 

234 self._metrics[full] = Histogram(full, help_text, labels, buckets) 

235 return self._metrics[full] 

236 

237 def timer( 

238 self, 

239 name: str, 

240 help_text: str = "", 

241 labels: Optional[Dict[str, str]] = None, 

242 buckets: tuple = DEFAULT_BUCKETS, 

243 ) -> Timer: 

244 full = self._full_name(name) 

245 with self._lock: 

246 if full not in self._metrics: 

247 self._metrics[full] = Timer(full, help_text, labels, buckets) 

248 return self._metrics[full] 

249 

250 def get(self, name: str) -> Optional[Any]: 

251 return self._metrics.get(self._full_name(name)) 

252 

253 def list_metrics(self) -> List[str]: 

254 with self._lock: 

255 return list(self._metrics.keys()) 

256 

257 def get_all(self) -> Dict[str, Any]: 

258 """Return raw values for all metrics (for programmatic use).""" 

259 result = {} 

260 with self._lock: 

261 for name, metric in self._metrics.items(): 

262 if hasattr(metric, "get"): 

263 result[name] = metric.get() 

264 return result 

265 

266 def to_prometheus(self) -> str: 

267 """Export all metrics in Prometheus text format.""" 

268 lines = [] 

269 with self._lock: 

270 for name, metric in self._metrics.items(): 

271 if metric.help: 

272 lines.append(f"# HELP {name} {metric.help}") 

273 lines.append(f"# TYPE {name} histogram" if isinstance(metric, (Histogram, Timer)) else 

274 f"# TYPE {name} gauge" if isinstance(metric, Gauge) else 

275 f"# TYPE {name} counter") 

276 

277 if isinstance(metric, Counter): 

278 lbl = metric._format_labels() 

279 lines.append(f"{name}{lbl} {metric.get()}") 

280 elif isinstance(metric, Gauge): 

281 lbl = metric._format_labels() 

282 lines.append(f"{name}{lbl} {metric.get()}") 

283 elif isinstance(metric, Histogram): 

284 lbl = metric._format_labels() 

285 data = metric.get() 

286 lines.append(f"{name}_count{lbl} {data['count']}") 

287 lines.append(f"{name}_sum{lbl} {data['sum']}") 

288 for bucket_name in metric.buckets + ("+Inf",): 

289 bval = data["buckets"].get(bucket_name, 0) 

290 # Prometheus: bucket labels use 'le' key 

291 le_label = '{le="' + str(bucket_name) + '"}' 

292 lines.append(f"{name}_bucket{lbl}{le_label} {bval}") 

293 elif isinstance(metric, Timer): 

294 lbl = metric.histogram._format_labels() 

295 hdata = metric.histogram.get() 

296 lines.append(f"{name}_count{lbl} {hdata['count']}") 

297 lines.append(f"{name}_sum{lbl} {hdata['sum']}") 

298 for bucket_name in metric.histogram.buckets + ("+Inf",): 

299 bval = hdata["buckets"].get(bucket_name, 0) 

300 le_label = '{le="' + str(bucket_name) + '"}' 

301 lines.append(f"{name}_bucket{lbl}{le_label} {bval}") 

302 

303 return "\n".join(lines) + "\n" 

304 

305 

306# ============================================================================ 

307# Global singleton 

308# ============================================================================ 

309 

310_default_collector: Optional[MetricsCollector] = None 

311_collector_lock = threading.Lock() 

312 

313 

314def get_metrics_collector(namespace: str = "") -> MetricsCollector: 

315 global _default_collector 

316 if _default_collector is None: 

317 with _collector_lock: 

318 if _default_collector is None: 

319 _default_collector = MetricsCollector(namespace=namespace or "agentos") 

320 return _default_collector