Coverage for src / kemi / operations / _ops_metrics.py: 58%
90 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1"""Metrics, audit, and query-cache enable/disable operations."""
3from __future__ import annotations
5import logging
6from contextlib import nullcontext
7from typing import TYPE_CHECKING, Any
9if TYPE_CHECKING:
10 from kemi._memory_impl import Memory
12logger = logging.getLogger(__name__)
15def latency_tracker(memory: "Memory", operation: str) -> Any:
16 """Return a context manager that tracks operation latency if metrics are enabled."""
17 if memory._metrics is not None:
18 return memory._metrics.track(operation)
19 return nullcontext()
22def track_operation(memory: "Memory", operation: str, **details: Any) -> None:
23 """Record a single operation in the metrics collector (no-op if disabled)."""
24 if memory._metrics is None:
25 return
26 try:
27 if hasattr(memory._metrics, "track_operation"):
28 memory._metrics.track_operation(operation, **details)
29 elif hasattr(memory._metrics, "inc"):
30 memory._metrics.inc(f"{operation}_total")
31 except (AttributeError, TypeError):
32 logger.debug("metrics.track_operation failed for %s", operation, exc_info=True)
35def track_operation_full(
36 memory: "Memory",
37 operation: str,
38 user_id: str,
39 details: dict[str, Any] | None,
40 memory_id: str | None,
41 namespace: str,
42 status: str,
43 audit_batch: list[dict[str, Any]] | None,
44) -> None:
45 """Track an operation in metrics and audit trail.
47 If *audit_batch* is provided, the audit entry is appended to that list
48 instead of being written immediately. The caller is responsible for
49 passing the list to ``AuditTrail.log_operation_batch``.
50 """
51 if memory._metrics is not None:
52 counter_name = f"{operation}_total"
53 counter = getattr(memory._metrics, counter_name, None)
54 if counter is not None:
55 try:
56 counter.inc(1)
57 except (AttributeError, TypeError):
58 pass
59 if audit_batch is not None:
60 audit_batch.append(
61 {
62 "user_id": user_id,
63 "operation": operation,
64 "details": details or {},
65 "memory_id": memory_id,
66 "namespace": namespace,
67 "status": status,
68 }
69 )
70 return
71 if memory._audit_trail is not None:
72 try:
73 memory._audit_trail.log_operation(
74 user_id=user_id,
75 operation=operation,
76 details=details or {},
77 memory_id=memory_id,
78 namespace=namespace,
79 status=status,
80 )
81 except (AttributeError, TypeError, ValueError):
82 # Broad catch: audit log can fail for many reasons (DB locked,
83 # schema mismatch, IO error). Audit must never break the caller.
84 logger.warning(f"Audit log failed for {operation}", exc_info=True)
87def record_embed_error(memory: "Memory") -> None:
88 """Increment the embed error counter (no-op if metrics disabled)."""
89 if memory._metrics is not None and hasattr(memory._metrics, "embed_errors_total"):
90 try:
91 memory._metrics.embed_errors_total.inc(1)
92 except (AttributeError, TypeError):
93 pass
96def record_store_error(memory: "Memory") -> None:
97 """Increment the store error counter (no-op if metrics disabled)."""
98 if memory._metrics is not None and hasattr(memory._metrics, "store_errors_total"):
99 try:
100 memory._metrics.store_errors_total.inc(1)
101 except (AttributeError, TypeError):
102 pass
105def get_metrics(memory: "Memory") -> dict[str, Any] | None:
106 """Return current metrics snapshot as a dict, or None if disabled."""
107 if memory._metrics is None:
108 return None
109 try:
110 if hasattr(memory._metrics, "to_dict"):
111 return memory._metrics.to_dict()
112 if hasattr(memory._metrics, "snapshot"):
113 return memory._metrics.snapshot()
114 except (AttributeError, TypeError):
115 logger.debug("get_metrics failed", exc_info=True)
116 return None
119def get_metrics_prometheus(memory: "Memory") -> str | None:
120 """Return metrics in Prometheus text format, or None if disabled."""
121 if memory._metrics is None:
122 return None
123 try:
124 if hasattr(memory._metrics, "to_prometheus"):
125 return memory._metrics.to_prometheus()
126 except (AttributeError, TypeError):
127 logger.debug("get_metrics_prometheus failed", exc_info=True)
128 return None
131def enable_adaptive_retrieval(memory: "Memory", enable: bool = True) -> None:
132 """Enable or disable adaptive retrieval (re-weights hybrid scores per user)."""
133 if not enable:
134 memory._adaptive_retriever = None
135 return
136 try:
137 from kemi.adaptive import AdaptiveRetriever
139 memory._adaptive_retriever = AdaptiveRetriever()
140 logger.info("Adaptive retrieval enabled")
141 except ImportError as e:
142 logger.warning(f"Adaptive retrieval module not available: {e}")
145def enable_audit_trail(
146 memory: "Memory",
147 retention_days: int = 365,
148 auto_purge: bool = True,
149) -> None:
150 """Enable the audit trail for compliance logging."""
151 try:
152 from kemi.audit import AuditTrail
154 conn = memory._store._get_connection() # type: ignore[attr-defined]
155 memory._audit_trail = AuditTrail(
156 db_connection=conn,
157 retention_days=retention_days,
158 auto_purge=auto_purge,
159 )
160 except (ImportError, AttributeError) as e:
161 logger.warning(f"Audit trail not available: {e}")
164def enable_query_cache(memory: "Memory", max_size: int = 128) -> None:
165 """Enable an LRU cache for ``recall()`` results."""
166 from kemi.operations._query_cache import _QueryCache
168 memory._query_cache = _QueryCache(max_size=max_size)
171def disable_query_cache(memory: "Memory") -> None:
172 """Disable the query cache."""
173 memory._query_cache = None