Coverage for src / kemi / observability.py: 99%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1"""Observability and metrics for kemi operations. 

2 

3Provides Prometheus-compatible metrics for monitoring and debugging: 

4- Operation counters (remember, recall, forget, etc.) 

5- Latency histograms 

6- Embedding generation tracking 

7- Storage operation tracking 

8- Error counters 

9 

10All metrics are collected in-memory with zero external dependencies. 

11Export formats: Prometheus text format, JSON, and Python dict. 

12 

13Usage: 

14 from kemi.observability import MetricsCollector 

15 

16 metrics = MetricsCollector() 

17 with metrics.track("remember"): 

18 memory.remember("user123", "content") 

19""" 

20 

21import logging 

22import threading 

23import time 

24from dataclasses import dataclass, field 

25from datetime import datetime, timezone 

26from typing import Any 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31@dataclass 

32class MetricValue: 

33 """A single metric value with metadata.""" 

34 

35 name: str 

36 value: float 

37 timestamp: float = field(default_factory=time.time) 

38 labels: dict[str, str] = field(default_factory=dict) 

39 

40 

41class Counter: 

42 """Monotonically increasing counter.""" 

43 

44 def __init__(self, name: str, help_text: str = "", namespace: str = "kemi") -> None: 

45 self._name = f"{namespace}_{name}" 

46 self._help = help_text 

47 self._value: int = 0 

48 self._lock = threading.Lock() 

49 self._created = time.time() 

50 

51 def inc(self, amount: int = 1) -> None: 

52 with self._lock: 

53 self._value += amount 

54 

55 def value(self) -> int: 

56 with self._lock: 

57 return self._value 

58 

59 def to_prometheus(self) -> str: 

60 lines = [f"# HELP {self._name} {self._help}", f"# TYPE {self._name} counter"] 

61 lines.append(f"{self._name} {self._value}") 

62 return "\n".join(lines) 

63 

64 

65class Histogram: 

66 """Histogram for tracking distributions (e.g., latency).""" 

67 

68 _DEFAULT_BUCKETS = ( 

69 0.001, 

70 0.005, 

71 0.01, 

72 0.025, 

73 0.05, 

74 0.1, 

75 0.25, 

76 0.5, 

77 1.0, 

78 2.5, 

79 5.0, 

80 10.0, 

81 30.0, 

82 60.0, 

83 ) 

84 

85 def __init__( 

86 self, 

87 name: str, 

88 help_text: str = "", 

89 namespace: str = "kemi", 

90 buckets: tuple[float, ...] = _DEFAULT_BUCKETS, 

91 ) -> None: 

92 self._name = f"{namespace}_{name}" 

93 self._help = help_text 

94 self._buckets = buckets 

95 self._count: int = 0 

96 self._sum: float = 0.0 

97 self._bucket_counts: dict[float, int] = {b: 0 for b in buckets} 

98 self._lock = threading.Lock() 

99 self._created = time.time() 

100 

101 def observe(self, value: float) -> None: 

102 with self._lock: 

103 self._count += 1 

104 self._sum += value 

105 for bound in self._buckets: 

106 if value <= bound: 

107 self._bucket_counts[bound] += 1 

108 

109 def to_prometheus(self) -> str: 

110 lines = [f"# HELP {self._name} {self._help}", f"# TYPE {self._name} histogram"] 

111 for bound in self._buckets: 

112 count = self._bucket_counts[bound] 

113 lines.append(f'{self._name}_bucket{{le="{bound}"}} {count}') 

114 lines.append(f'{self._name}_bucket{{le="+Inf"}} {self._count}') 

115 lines.append(f"{self._name}_count {self._count}") 

116 lines.append(f"{self._name}_sum {self._sum:.6f}") 

117 return "\n".join(lines) 

118 

119 

120class Gauge: 

121 """Gauge that can go up and down.""" 

122 

123 def __init__(self, name: str, help_text: str = "", namespace: str = "kemi") -> None: 

124 self._name = f"{namespace}_{name}" 

125 self._help = help_text 

126 self._value: float = 0.0 

127 self._lock = threading.Lock() 

128 

129 def set(self, value: float) -> None: 

130 with self._lock: 

131 self._value = value 

132 

133 def inc(self, amount: float = 1.0) -> None: 

134 with self._lock: 

135 self._value += amount 

136 

137 def dec(self, amount: float = 1.0) -> None: 

138 with self._lock: 

139 self._value -= amount 

140 

141 def value(self) -> float: 

142 with self._lock: 

143 return self._value 

144 

145 def to_prometheus(self) -> str: 

146 lines = [f"# HELP {self._name} {self._help}", f"# TYPE {self._name} gauge"] 

147 lines.append(f"{self._name} {self._value}") 

148 return "\n".join(lines) 

149 

150 

151class MetricsCollector: 

152 """Central metrics collector for kemi operations. 

153 

154 Tracks: 

155 - Operation counts and latencies 

156 - Embedding generation stats 

157 - Storage operation stats 

158 - Error counts 

159 - Memory usage stats (total memories, users, etc.) 

160 """ 

161 

162 def __init__(self, namespace: str = "kemi") -> None: 

163 self._ns = namespace 

164 

165 # Operation counters 

166 self.remember_total = Counter( 

167 "remember_total", 

168 "Total number of remember operations", 

169 namespace, 

170 ) 

171 self.recall_total = Counter( 

172 "recall_total", 

173 "Total number of recall operations", 

174 namespace, 

175 ) 

176 self.forget_total = Counter( 

177 "forget_total", 

178 "Total number of forget operations", 

179 namespace, 

180 ) 

181 self.remember_many_total = Counter( 

182 "remember_many_total", 

183 "Total number of batch remember operations", 

184 namespace, 

185 ) 

186 self.update_total = Counter( 

187 "update_total", 

188 "Total number of update operations", 

189 namespace, 

190 ) 

191 self.prune_total = Counter( 

192 "prune_total", 

193 "Total number of prune operations", 

194 namespace, 

195 ) 

196 self.migrate_total = Counter( 

197 "migrate_total", 

198 "Total number of migrate operations", 

199 namespace, 

200 ) 

201 self.consolidate_total = Counter( 

202 "consolidate_total", 

203 "Total number of consolidate operations", 

204 namespace, 

205 ) 

206 self.export_total = Counter( 

207 "export_total", 

208 "Total number of export operations", 

209 namespace, 

210 ) 

211 self.import_total = Counter( 

212 "import_total", 

213 "Total number of import operations", 

214 namespace, 

215 ) 

216 self.feedback_total = Counter( 

217 "feedback_total", 

218 "Total number of feedback operations", 

219 namespace, 

220 ) 

221 

222 # Latency histograms 

223 self.remember_latency = Histogram( 

224 "remember_latency_seconds", 

225 "Latency of remember operations", 

226 namespace, 

227 ) 

228 self.recall_latency = Histogram( 

229 "recall_latency_seconds", 

230 "Latency of recall operations", 

231 namespace, 

232 ) 

233 self.forget_latency = Histogram( 

234 "forget_latency_seconds", 

235 "Latency of forget operations", 

236 namespace, 

237 ) 

238 self.remember_many_latency = Histogram( 

239 "remember_many_latency_seconds", 

240 "Latency of batch remember operations", 

241 namespace, 

242 ) 

243 self.embed_latency = Histogram( 

244 "embed_latency_seconds", 

245 "Latency of embedding generation", 

246 namespace, 

247 ) 

248 self.store_latency = Histogram( 

249 "store_latency_seconds", 

250 "Latency of storage operations", 

251 namespace, 

252 ) 

253 self.search_latency = Histogram( 

254 "search_latency_seconds", 

255 "Latency of vector search operations", 

256 namespace, 

257 ) 

258 

259 # Embedding metrics 

260 self.embed_total = Counter( 

261 "embed_total", 

262 "Total embeddings generated", 

263 namespace, 

264 ) 

265 self.embed_errors_total = Counter( 

266 "embed_errors_total", 

267 "Total embedding generation errors", 

268 namespace, 

269 ) 

270 self.embed_bytes_total = Counter( 

271 "embed_bytes_total", 

272 "Approximate total bytes embedded", 

273 namespace, 

274 ) 

275 

276 # Storage metrics 

277 self.store_errors_total = Counter( 

278 "store_errors_total", 

279 "Total storage operation errors", 

280 namespace, 

281 ) 

282 

283 # Duplicate and conflict metrics 

284 self.duplicates_detected = Counter( 

285 "duplicates_detected", 

286 "Total duplicates detected", 

287 namespace, 

288 ) 

289 self.conflicts_detected = Counter( 

290 "conflicts_detected", 

291 "Total conflicts detected", 

292 namespace, 

293 ) 

294 

295 # Lifecycle metrics 

296 self.lifecycle_transitions = Counter( 

297 "lifecycle_transitions", 

298 "Total lifecycle state transitions", 

299 namespace, 

300 ) 

301 

302 # Memory usage gauges 

303 self.total_memories = Gauge( 

304 "total_memories", 

305 "Current total number of memories", 

306 namespace, 

307 ) 

308 self.total_users = Gauge( 

309 "total_users", 

310 "Current total number of users", 

311 namespace, 

312 ) 

313 

314 def _start_timer(self) -> float: 

315 return time.monotonic() 

316 

317 def _stop_timer(self, start: float, histogram: Histogram) -> float: 

318 duration = time.monotonic() - start 

319 histogram.observe(duration) 

320 return duration 

321 

322 def track(self, operation: str) -> "_OperationTracker": 

323 """Start tracking an operation. Use as context manager. 

324 

325 Example: 

326 with metrics.track("remember"): 

327 memory.remember("user123", "content") 

328 """ 

329 return _OperationTracker(self, operation) 

330 

331 def record_operation( 

332 self, 

333 operation: str, 

334 duration: float, 

335 success: bool = True, 

336 metadata: dict[str, Any] | None = None, 

337 ) -> None: 

338 """Record a completed operation with timing.""" 

339 pass # Go through individual counters 

340 

341 def to_dict(self) -> dict[str, Any]: 

342 """Export all metrics as a Python dict.""" 

343 return { 

344 "operations": { 

345 "remember": self.remember_total.value(), 

346 "recall": self.recall_total.value(), 

347 "forget": self.forget_total.value(), 

348 "remember_many": self.remember_many_total.value(), 

349 "update": self.update_total.value(), 

350 "prune": self.prune_total.value(), 

351 "migrate": self.migrate_total.value(), 

352 "consolidate": self.consolidate_total.value(), 

353 "export": self.export_total.value(), 

354 "import": self.import_total.value(), 

355 "feedback": self.feedback_total.value(), 

356 }, 

357 "embeddings": { 

358 "total": self.embed_total.value(), 

359 "errors": self.embed_errors_total.value(), 

360 "bytes_approx": self.embed_bytes_total.value(), 

361 }, 

362 "storage": { 

363 "errors": self.store_errors_total.value(), 

364 }, 

365 "quality": { 

366 "duplicates_detected": self.duplicates_detected.value(), 

367 "conflicts_detected": self.conflicts_detected.value(), 

368 "lifecycle_transitions": self.lifecycle_transitions.value(), 

369 }, 

370 "memory_usage": { 

371 "total_memories": self.total_memories.value(), 

372 "total_users": self.total_users.value(), 

373 }, 

374 "timestamp": datetime.now(timezone.utc).isoformat(), 

375 } 

376 

377 def to_prometheus(self) -> str: 

378 """Export all metrics in Prometheus text format.""" 

379 metrics = [ 

380 self.remember_total, 

381 self.recall_total, 

382 self.forget_total, 

383 self.remember_many_total, 

384 self.update_total, 

385 self.prune_total, 

386 self.migrate_total, 

387 self.consolidate_total, 

388 self.export_total, 

389 self.import_total, 

390 self.feedback_total, 

391 self.embed_total, 

392 self.embed_errors_total, 

393 self.embed_bytes_total, 

394 self.store_errors_total, 

395 self.duplicates_detected, 

396 self.conflicts_detected, 

397 self.lifecycle_transitions, 

398 ] 

399 histograms = [ 

400 self.remember_latency, 

401 self.recall_latency, 

402 self.forget_latency, 

403 self.remember_many_latency, 

404 self.embed_latency, 

405 self.store_latency, 

406 self.search_latency, 

407 ] 

408 gauges = [ 

409 self.total_memories, 

410 self.total_users, 

411 ] 

412 

413 parts = [] 

414 for m in metrics: 

415 parts.append(m.to_prometheus()) 

416 for h in histograms: 

417 parts.append(h.to_prometheus()) 

418 for g in gauges: 

419 parts.append(g.to_prometheus()) 

420 

421 return "\n\n".join(parts) + "\n" 

422 

423 def reset(self) -> None: 

424 """Reset all metrics to zero. Useful for testing.""" 

425 counters = [ 

426 self.remember_total, 

427 self.recall_total, 

428 self.forget_total, 

429 self.remember_many_total, 

430 self.update_total, 

431 self.prune_total, 

432 self.migrate_total, 

433 self.consolidate_total, 

434 self.export_total, 

435 self.import_total, 

436 self.feedback_total, 

437 self.embed_total, 

438 self.embed_errors_total, 

439 self.embed_bytes_total, 

440 self.store_errors_total, 

441 self.duplicates_detected, 

442 self.conflicts_detected, 

443 self.lifecycle_transitions, 

444 ] 

445 for c in counters: 

446 with c._lock: 

447 c._value = 0 

448 

449 histograms = [ 

450 self.remember_latency, 

451 self.recall_latency, 

452 self.forget_latency, 

453 self.remember_many_latency, 

454 self.embed_latency, 

455 self.store_latency, 

456 self.search_latency, 

457 ] 

458 for h in histograms: 

459 with h._lock: 

460 h._count = 0 

461 h._sum = 0.0 

462 h._bucket_counts = {b: 0 for b in h._buckets} 

463 

464 gauges = [self.total_memories, self.total_users] 

465 for g in gauges: 

466 with g._lock: 

467 g._value = 0.0 

468 

469 

470_HISTOGRAM_MAP: dict[str, str] = { 

471 "remember": "remember_latency", 

472 "recall": "recall_latency", 

473 "forget": "forget_latency", 

474 "remember_many": "remember_many_latency", 

475 "embed": "embed_latency", 

476 "store": "store_latency", 

477 "search": "search_latency", 

478} 

479 

480 

481class _OperationTracker: 

482 """Context manager for tracking operation duration.""" 

483 

484 def __init__(self, collector: MetricsCollector, operation: str) -> None: 

485 self._collector = collector 

486 self._operation = operation 

487 self._start: float = 0.0 

488 

489 def __enter__(self) -> "_OperationTracker": 

490 self._start = self._collector._start_timer() 

491 return self 

492 

493 def __exit__(self, *args: Any) -> None: 

494 hist_name = _HISTOGRAM_MAP.get(self._operation, "embed_latency") 

495 histogram = getattr(self._collector, hist_name, self._collector.embed_latency) 

496 self._collector._stop_timer(self._start, histogram) 

497 

498 @property 

499 def duration(self) -> float: 

500 if self._start: 

501 return time.monotonic() - self._start 

502 return 0.0 

503 

504 

505# Global singleton for convenience 

506_global_collector: MetricsCollector | None = None 

507_collector_lock = threading.Lock() 

508 

509 

510def get_metrics_collector() -> MetricsCollector: 

511 """Get or create the global metrics collector singleton.""" 

512 global _global_collector 

513 if _global_collector is not None: 

514 return _global_collector 

515 

516 with _collector_lock: 

517 if _global_collector is not None: 

518 return _global_collector 

519 _global_collector = MetricsCollector() 

520 return _global_collector 

521 

522 

523def reset_metrics() -> None: 

524 """Reset the global metrics collector. For testing.""" 

525 global _global_collector 

526 with _collector_lock: 

527 _global_collector = None