Coverage for src / kemi / operations / _ops_metrics.py: 58%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1"""Metrics, audit, and query-cache enable/disable operations.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from contextlib import nullcontext 

7from typing import TYPE_CHECKING, Any 

8 

9if TYPE_CHECKING: 

10 from kemi._memory_impl import Memory 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def latency_tracker(memory: "Memory", operation: str) -> Any: 

16 """Return a context manager that tracks operation latency if metrics are enabled.""" 

17 if memory._metrics is not None: 

18 return memory._metrics.track(operation) 

19 return nullcontext() 

20 

21 

22def track_operation(memory: "Memory", operation: str, **details: Any) -> None: 

23 """Record a single operation in the metrics collector (no-op if disabled).""" 

24 if memory._metrics is None: 

25 return 

26 try: 

27 if hasattr(memory._metrics, "track_operation"): 

28 memory._metrics.track_operation(operation, **details) 

29 elif hasattr(memory._metrics, "inc"): 

30 memory._metrics.inc(f"{operation}_total") 

31 except (AttributeError, TypeError): 

32 logger.debug("metrics.track_operation failed for %s", operation, exc_info=True) 

33 

34 

35def track_operation_full( 

36 memory: "Memory", 

37 operation: str, 

38 user_id: str, 

39 details: dict[str, Any] | None, 

40 memory_id: str | None, 

41 namespace: str, 

42 status: str, 

43 audit_batch: list[dict[str, Any]] | None, 

44) -> None: 

45 """Track an operation in metrics and audit trail. 

46 

47 If *audit_batch* is provided, the audit entry is appended to that list 

48 instead of being written immediately. The caller is responsible for 

49 passing the list to ``AuditTrail.log_operation_batch``. 

50 """ 

51 if memory._metrics is not None: 

52 counter_name = f"{operation}_total" 

53 counter = getattr(memory._metrics, counter_name, None) 

54 if counter is not None: 

55 try: 

56 counter.inc(1) 

57 except (AttributeError, TypeError): 

58 pass 

59 if audit_batch is not None: 

60 audit_batch.append( 

61 { 

62 "user_id": user_id, 

63 "operation": operation, 

64 "details": details or {}, 

65 "memory_id": memory_id, 

66 "namespace": namespace, 

67 "status": status, 

68 } 

69 ) 

70 return 

71 if memory._audit_trail is not None: 

72 try: 

73 memory._audit_trail.log_operation( 

74 user_id=user_id, 

75 operation=operation, 

76 details=details or {}, 

77 memory_id=memory_id, 

78 namespace=namespace, 

79 status=status, 

80 ) 

81 except (AttributeError, TypeError, ValueError): 

82 # Broad catch: audit log can fail for many reasons (DB locked, 

83 # schema mismatch, IO error). Audit must never break the caller. 

84 logger.warning(f"Audit log failed for {operation}", exc_info=True) 

85 

86 

87def record_embed_error(memory: "Memory") -> None: 

88 """Increment the embed error counter (no-op if metrics disabled).""" 

89 if memory._metrics is not None and hasattr(memory._metrics, "embed_errors_total"): 

90 try: 

91 memory._metrics.embed_errors_total.inc(1) 

92 except (AttributeError, TypeError): 

93 pass 

94 

95 

96def record_store_error(memory: "Memory") -> None: 

97 """Increment the store error counter (no-op if metrics disabled).""" 

98 if memory._metrics is not None and hasattr(memory._metrics, "store_errors_total"): 

99 try: 

100 memory._metrics.store_errors_total.inc(1) 

101 except (AttributeError, TypeError): 

102 pass 

103 

104 

105def get_metrics(memory: "Memory") -> dict[str, Any] | None: 

106 """Return current metrics snapshot as a dict, or None if disabled.""" 

107 if memory._metrics is None: 

108 return None 

109 try: 

110 if hasattr(memory._metrics, "to_dict"): 

111 return memory._metrics.to_dict() 

112 if hasattr(memory._metrics, "snapshot"): 

113 return memory._metrics.snapshot() 

114 except (AttributeError, TypeError): 

115 logger.debug("get_metrics failed", exc_info=True) 

116 return None 

117 

118 

119def get_metrics_prometheus(memory: "Memory") -> str | None: 

120 """Return metrics in Prometheus text format, or None if disabled.""" 

121 if memory._metrics is None: 

122 return None 

123 try: 

124 if hasattr(memory._metrics, "to_prometheus"): 

125 return memory._metrics.to_prometheus() 

126 except (AttributeError, TypeError): 

127 logger.debug("get_metrics_prometheus failed", exc_info=True) 

128 return None 

129 

130 

131def enable_adaptive_retrieval(memory: "Memory", enable: bool = True) -> None: 

132 """Enable or disable adaptive retrieval (re-weights hybrid scores per user).""" 

133 if not enable: 

134 memory._adaptive_retriever = None 

135 return 

136 try: 

137 from kemi.adaptive import AdaptiveRetriever 

138 

139 memory._adaptive_retriever = AdaptiveRetriever() 

140 logger.info("Adaptive retrieval enabled") 

141 except ImportError as e: 

142 logger.warning(f"Adaptive retrieval module not available: {e}") 

143 

144 

145def enable_audit_trail( 

146 memory: "Memory", 

147 retention_days: int = 365, 

148 auto_purge: bool = True, 

149) -> None: 

150 """Enable the audit trail for compliance logging.""" 

151 try: 

152 from kemi.audit import AuditTrail 

153 

154 conn = memory._store._get_connection() # type: ignore[attr-defined] 

155 memory._audit_trail = AuditTrail( 

156 db_connection=conn, 

157 retention_days=retention_days, 

158 auto_purge=auto_purge, 

159 ) 

160 except (ImportError, AttributeError) as e: 

161 logger.warning(f"Audit trail not available: {e}") 

162 

163 

164def enable_query_cache(memory: "Memory", max_size: int = 128) -> None: 

165 """Enable an LRU cache for ``recall()`` results.""" 

166 from kemi.operations._query_cache import _QueryCache 

167 

168 memory._query_cache = _QueryCache(max_size=max_size) 

169 

170 

171def disable_query_cache(memory: "Memory") -> None: 

172 """Disable the query cache.""" 

173 memory._query_cache = None