Coverage for src / kemi / observability.py: 99%
177 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1"""Observability and metrics for kemi operations.
3Provides Prometheus-compatible metrics for monitoring and debugging:
4- Operation counters (remember, recall, forget, etc.)
5- Latency histograms
6- Embedding generation tracking
7- Storage operation tracking
8- Error counters
10All metrics are collected in-memory with zero external dependencies.
11Export formats: Prometheus text format, JSON, and Python dict.
13Usage:
14 from kemi.observability import MetricsCollector
16 metrics = MetricsCollector()
17 with metrics.track("remember"):
18 memory.remember("user123", "content")
19"""
21import logging
22import threading
23import time
24from dataclasses import dataclass, field
25from datetime import datetime, timezone
26from typing import Any
28logger = logging.getLogger(__name__)
31@dataclass
32class MetricValue:
33 """A single metric value with metadata."""
35 name: str
36 value: float
37 timestamp: float = field(default_factory=time.time)
38 labels: dict[str, str] = field(default_factory=dict)
41class Counter:
42 """Monotonically increasing counter."""
44 def __init__(self, name: str, help_text: str = "", namespace: str = "kemi") -> None:
45 self._name = f"{namespace}_{name}"
46 self._help = help_text
47 self._value: int = 0
48 self._lock = threading.Lock()
49 self._created = time.time()
51 def inc(self, amount: int = 1) -> None:
52 with self._lock:
53 self._value += amount
55 def value(self) -> int:
56 with self._lock:
57 return self._value
59 def to_prometheus(self) -> str:
60 lines = [f"# HELP {self._name} {self._help}", f"# TYPE {self._name} counter"]
61 lines.append(f"{self._name} {self._value}")
62 return "\n".join(lines)
65class Histogram:
66 """Histogram for tracking distributions (e.g., latency)."""
68 _DEFAULT_BUCKETS = (
69 0.001,
70 0.005,
71 0.01,
72 0.025,
73 0.05,
74 0.1,
75 0.25,
76 0.5,
77 1.0,
78 2.5,
79 5.0,
80 10.0,
81 30.0,
82 60.0,
83 )
85 def __init__(
86 self,
87 name: str,
88 help_text: str = "",
89 namespace: str = "kemi",
90 buckets: tuple[float, ...] = _DEFAULT_BUCKETS,
91 ) -> None:
92 self._name = f"{namespace}_{name}"
93 self._help = help_text
94 self._buckets = buckets
95 self._count: int = 0
96 self._sum: float = 0.0
97 self._bucket_counts: dict[float, int] = {b: 0 for b in buckets}
98 self._lock = threading.Lock()
99 self._created = time.time()
101 def observe(self, value: float) -> None:
102 with self._lock:
103 self._count += 1
104 self._sum += value
105 for bound in self._buckets:
106 if value <= bound:
107 self._bucket_counts[bound] += 1
109 def to_prometheus(self) -> str:
110 lines = [f"# HELP {self._name} {self._help}", f"# TYPE {self._name} histogram"]
111 for bound in self._buckets:
112 count = self._bucket_counts[bound]
113 lines.append(f'{self._name}_bucket{{le="{bound}"}} {count}')
114 lines.append(f'{self._name}_bucket{{le="+Inf"}} {self._count}')
115 lines.append(f"{self._name}_count {self._count}")
116 lines.append(f"{self._name}_sum {self._sum:.6f}")
117 return "\n".join(lines)
120class Gauge:
121 """Gauge that can go up and down."""
123 def __init__(self, name: str, help_text: str = "", namespace: str = "kemi") -> None:
124 self._name = f"{namespace}_{name}"
125 self._help = help_text
126 self._value: float = 0.0
127 self._lock = threading.Lock()
129 def set(self, value: float) -> None:
130 with self._lock:
131 self._value = value
133 def inc(self, amount: float = 1.0) -> None:
134 with self._lock:
135 self._value += amount
137 def dec(self, amount: float = 1.0) -> None:
138 with self._lock:
139 self._value -= amount
141 def value(self) -> float:
142 with self._lock:
143 return self._value
145 def to_prometheus(self) -> str:
146 lines = [f"# HELP {self._name} {self._help}", f"# TYPE {self._name} gauge"]
147 lines.append(f"{self._name} {self._value}")
148 return "\n".join(lines)
151class MetricsCollector:
152 """Central metrics collector for kemi operations.
154 Tracks:
155 - Operation counts and latencies
156 - Embedding generation stats
157 - Storage operation stats
158 - Error counts
159 - Memory usage stats (total memories, users, etc.)
160 """
162 def __init__(self, namespace: str = "kemi") -> None:
163 self._ns = namespace
165 # Operation counters
166 self.remember_total = Counter(
167 "remember_total",
168 "Total number of remember operations",
169 namespace,
170 )
171 self.recall_total = Counter(
172 "recall_total",
173 "Total number of recall operations",
174 namespace,
175 )
176 self.forget_total = Counter(
177 "forget_total",
178 "Total number of forget operations",
179 namespace,
180 )
181 self.remember_many_total = Counter(
182 "remember_many_total",
183 "Total number of batch remember operations",
184 namespace,
185 )
186 self.update_total = Counter(
187 "update_total",
188 "Total number of update operations",
189 namespace,
190 )
191 self.prune_total = Counter(
192 "prune_total",
193 "Total number of prune operations",
194 namespace,
195 )
196 self.migrate_total = Counter(
197 "migrate_total",
198 "Total number of migrate operations",
199 namespace,
200 )
201 self.consolidate_total = Counter(
202 "consolidate_total",
203 "Total number of consolidate operations",
204 namespace,
205 )
206 self.export_total = Counter(
207 "export_total",
208 "Total number of export operations",
209 namespace,
210 )
211 self.import_total = Counter(
212 "import_total",
213 "Total number of import operations",
214 namespace,
215 )
216 self.feedback_total = Counter(
217 "feedback_total",
218 "Total number of feedback operations",
219 namespace,
220 )
222 # Latency histograms
223 self.remember_latency = Histogram(
224 "remember_latency_seconds",
225 "Latency of remember operations",
226 namespace,
227 )
228 self.recall_latency = Histogram(
229 "recall_latency_seconds",
230 "Latency of recall operations",
231 namespace,
232 )
233 self.forget_latency = Histogram(
234 "forget_latency_seconds",
235 "Latency of forget operations",
236 namespace,
237 )
238 self.remember_many_latency = Histogram(
239 "remember_many_latency_seconds",
240 "Latency of batch remember operations",
241 namespace,
242 )
243 self.embed_latency = Histogram(
244 "embed_latency_seconds",
245 "Latency of embedding generation",
246 namespace,
247 )
248 self.store_latency = Histogram(
249 "store_latency_seconds",
250 "Latency of storage operations",
251 namespace,
252 )
253 self.search_latency = Histogram(
254 "search_latency_seconds",
255 "Latency of vector search operations",
256 namespace,
257 )
259 # Embedding metrics
260 self.embed_total = Counter(
261 "embed_total",
262 "Total embeddings generated",
263 namespace,
264 )
265 self.embed_errors_total = Counter(
266 "embed_errors_total",
267 "Total embedding generation errors",
268 namespace,
269 )
270 self.embed_bytes_total = Counter(
271 "embed_bytes_total",
272 "Approximate total bytes embedded",
273 namespace,
274 )
276 # Storage metrics
277 self.store_errors_total = Counter(
278 "store_errors_total",
279 "Total storage operation errors",
280 namespace,
281 )
283 # Duplicate and conflict metrics
284 self.duplicates_detected = Counter(
285 "duplicates_detected",
286 "Total duplicates detected",
287 namespace,
288 )
289 self.conflicts_detected = Counter(
290 "conflicts_detected",
291 "Total conflicts detected",
292 namespace,
293 )
295 # Lifecycle metrics
296 self.lifecycle_transitions = Counter(
297 "lifecycle_transitions",
298 "Total lifecycle state transitions",
299 namespace,
300 )
302 # Memory usage gauges
303 self.total_memories = Gauge(
304 "total_memories",
305 "Current total number of memories",
306 namespace,
307 )
308 self.total_users = Gauge(
309 "total_users",
310 "Current total number of users",
311 namespace,
312 )
314 def _start_timer(self) -> float:
315 return time.monotonic()
317 def _stop_timer(self, start: float, histogram: Histogram) -> float:
318 duration = time.monotonic() - start
319 histogram.observe(duration)
320 return duration
322 def track(self, operation: str) -> "_OperationTracker":
323 """Start tracking an operation. Use as context manager.
325 Example:
326 with metrics.track("remember"):
327 memory.remember("user123", "content")
328 """
329 return _OperationTracker(self, operation)
331 def record_operation(
332 self,
333 operation: str,
334 duration: float,
335 success: bool = True,
336 metadata: dict[str, Any] | None = None,
337 ) -> None:
338 """Record a completed operation with timing."""
339 pass # Go through individual counters
341 def to_dict(self) -> dict[str, Any]:
342 """Export all metrics as a Python dict."""
343 return {
344 "operations": {
345 "remember": self.remember_total.value(),
346 "recall": self.recall_total.value(),
347 "forget": self.forget_total.value(),
348 "remember_many": self.remember_many_total.value(),
349 "update": self.update_total.value(),
350 "prune": self.prune_total.value(),
351 "migrate": self.migrate_total.value(),
352 "consolidate": self.consolidate_total.value(),
353 "export": self.export_total.value(),
354 "import": self.import_total.value(),
355 "feedback": self.feedback_total.value(),
356 },
357 "embeddings": {
358 "total": self.embed_total.value(),
359 "errors": self.embed_errors_total.value(),
360 "bytes_approx": self.embed_bytes_total.value(),
361 },
362 "storage": {
363 "errors": self.store_errors_total.value(),
364 },
365 "quality": {
366 "duplicates_detected": self.duplicates_detected.value(),
367 "conflicts_detected": self.conflicts_detected.value(),
368 "lifecycle_transitions": self.lifecycle_transitions.value(),
369 },
370 "memory_usage": {
371 "total_memories": self.total_memories.value(),
372 "total_users": self.total_users.value(),
373 },
374 "timestamp": datetime.now(timezone.utc).isoformat(),
375 }
377 def to_prometheus(self) -> str:
378 """Export all metrics in Prometheus text format."""
379 metrics = [
380 self.remember_total,
381 self.recall_total,
382 self.forget_total,
383 self.remember_many_total,
384 self.update_total,
385 self.prune_total,
386 self.migrate_total,
387 self.consolidate_total,
388 self.export_total,
389 self.import_total,
390 self.feedback_total,
391 self.embed_total,
392 self.embed_errors_total,
393 self.embed_bytes_total,
394 self.store_errors_total,
395 self.duplicates_detected,
396 self.conflicts_detected,
397 self.lifecycle_transitions,
398 ]
399 histograms = [
400 self.remember_latency,
401 self.recall_latency,
402 self.forget_latency,
403 self.remember_many_latency,
404 self.embed_latency,
405 self.store_latency,
406 self.search_latency,
407 ]
408 gauges = [
409 self.total_memories,
410 self.total_users,
411 ]
413 parts = []
414 for m in metrics:
415 parts.append(m.to_prometheus())
416 for h in histograms:
417 parts.append(h.to_prometheus())
418 for g in gauges:
419 parts.append(g.to_prometheus())
421 return "\n\n".join(parts) + "\n"
423 def reset(self) -> None:
424 """Reset all metrics to zero. Useful for testing."""
425 counters = [
426 self.remember_total,
427 self.recall_total,
428 self.forget_total,
429 self.remember_many_total,
430 self.update_total,
431 self.prune_total,
432 self.migrate_total,
433 self.consolidate_total,
434 self.export_total,
435 self.import_total,
436 self.feedback_total,
437 self.embed_total,
438 self.embed_errors_total,
439 self.embed_bytes_total,
440 self.store_errors_total,
441 self.duplicates_detected,
442 self.conflicts_detected,
443 self.lifecycle_transitions,
444 ]
445 for c in counters:
446 with c._lock:
447 c._value = 0
449 histograms = [
450 self.remember_latency,
451 self.recall_latency,
452 self.forget_latency,
453 self.remember_many_latency,
454 self.embed_latency,
455 self.store_latency,
456 self.search_latency,
457 ]
458 for h in histograms:
459 with h._lock:
460 h._count = 0
461 h._sum = 0.0
462 h._bucket_counts = {b: 0 for b in h._buckets}
464 gauges = [self.total_memories, self.total_users]
465 for g in gauges:
466 with g._lock:
467 g._value = 0.0
470_HISTOGRAM_MAP: dict[str, str] = {
471 "remember": "remember_latency",
472 "recall": "recall_latency",
473 "forget": "forget_latency",
474 "remember_many": "remember_many_latency",
475 "embed": "embed_latency",
476 "store": "store_latency",
477 "search": "search_latency",
478}
481class _OperationTracker:
482 """Context manager for tracking operation duration."""
484 def __init__(self, collector: MetricsCollector, operation: str) -> None:
485 self._collector = collector
486 self._operation = operation
487 self._start: float = 0.0
489 def __enter__(self) -> "_OperationTracker":
490 self._start = self._collector._start_timer()
491 return self
493 def __exit__(self, *args: Any) -> None:
494 hist_name = _HISTOGRAM_MAP.get(self._operation, "embed_latency")
495 histogram = getattr(self._collector, hist_name, self._collector.embed_latency)
496 self._collector._stop_timer(self._start, histogram)
498 @property
499 def duration(self) -> float:
500 if self._start:
501 return time.monotonic() - self._start
502 return 0.0
505# Global singleton for convenience
506_global_collector: MetricsCollector | None = None
507_collector_lock = threading.Lock()
510def get_metrics_collector() -> MetricsCollector:
511 """Get or create the global metrics collector singleton."""
512 global _global_collector
513 if _global_collector is not None:
514 return _global_collector
516 with _collector_lock:
517 if _global_collector is not None:
518 return _global_collector
519 _global_collector = MetricsCollector()
520 return _global_collector
523def reset_metrics() -> None:
524 """Reset the global metrics collector. For testing."""
525 global _global_collector
526 with _collector_lock:
527 _global_collector = None