Coverage for agentos/cache/response_cache.py: 44%
131 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Response Cache with TTL — Cached LLM responses with configurable expiry.
4Supports in-memory LRU cache with TTL, disk persistence, and cache key
5strategies (exact match, semantic similarity, template-based).
6"""
8from __future__ import annotations
10import hashlib
11import json
12import time
13from collections import OrderedDict
14from dataclasses import dataclass, field
15from enum import Enum
16from typing import Any, Optional
19class CacheKeyStrategy(Enum):
20 """Strategy for generating cache lookup keys."""
21 EXACT = "exact"
22 """Hash of the full prompt/message."""
24 NORMALIZED = "normalized"
25 """Hash after whitespace/lowercase normalization."""
27 TEMPLATE = "template"
28 """Hash of template name + variables (ignores phrasing variations)."""
31@dataclass
32class CacheEntry:
33 """A single cache entry."""
35 key: str
36 value: Any
37 created_at: float = field(default_factory=time.time)
38 ttl_seconds: float = 3600.0
39 """Time-to-live in seconds. None means no expiry."""
41 hit_count: int = 0
42 last_accessed: float = 0.0
43 metadata: dict[str, Any] = field(default_factory=dict)
45 @property
46 def is_expired(self) -> bool:
47 if self.ttl_seconds <= 0:
48 return False
49 return (time.time() - self.created_at) > self.ttl_seconds
51 @property
52 def age_seconds(self) -> float:
53 return time.time() - self.created_at
56@dataclass
57class CacheStats:
58 """Cache performance statistics."""
60 hits: int = 0
61 misses: int = 0
62 evictions: int = 0
63 expirations: int = 0
64 size: int = 0
65 max_size: int = 0
67 @property
68 def hit_rate(self) -> float:
69 total = self.hits + self.misses
70 return self.hits / total if total > 0 else 0.0
72 @property
73 def utilization(self) -> float:
74 return self.size / self.max_size if self.max_size > 0 else 0.0
77class ResponseCache:
78 """
79 Response cache with TTL and LRU eviction.
81 Supports:
82 - In-memory LRU cache with configurable TTL
83 - Multiple cache key strategies (exact, normalized, template)
84 - Statistics tracking (hit rate, evictions, expirations)
85 - Optional disk persistence (planned)
87 Example::
89 cache = ResponseCache(max_entries=1000, default_ttl=3600)
90 cache.put("What is 2+2?", "4")
91 result = cache.get("What is 2+2?") # "4" (cache hit)
92 """
94 def __init__(
95 self,
96 max_entries: int = 1000,
97 default_ttl: float = 3600.0,
98 key_strategy: CacheKeyStrategy = CacheKeyStrategy.EXACT,
99 ):
100 self._max_entries = max_entries
101 self._default_ttl = default_ttl
102 self._key_strategy = key_strategy
103 self._store: OrderedDict[str, CacheEntry] = OrderedDict()
104 self._stats = CacheStats(max_size=max_entries)
106 def get(self, prompt: str, **context: Any) -> Optional[Any]:
107 """
108 Retrieve cached response for a prompt.
110 Args:
111 prompt: The prompt/message text.
112 **context: Additional context for template-based keys.
114 Returns:
115 Cached value if found and not expired, else None.
116 """
117 key = self._make_key(prompt, context)
118 entry = self._store.get(key)
120 if entry is None:
121 self._stats.misses += 1
122 return None
124 if entry.is_expired:
125 self._evict(key)
126 self._stats.expirations += 1
127 self._stats.misses += 1
128 return None
130 # Move to end for LRU
131 self._store.move_to_end(key)
132 entry.hit_count += 1
133 entry.last_accessed = time.time()
134 self._stats.hits += 1
135 return entry.value
137 def put(
138 self,
139 prompt: str,
140 value: Any,
141 ttl: Optional[float] = None,
142 **context: Any,
143 ) -> str:
144 """
145 Cache a response.
147 Args:
148 prompt: The prompt/message text.
149 value: The response to cache.
150 ttl: Custom TTL in seconds (default: self._default_ttl).
151 **context: Additional context for template-based keys.
153 Returns:
154 The cache key string.
155 """
156 key = self._make_key(prompt, context)
157 effective_ttl = ttl if ttl is not None else self._default_ttl
159 if key in self._store:
160 self._store.move_to_end(key)
162 self._store[key] = CacheEntry(
163 key=key,
164 value=value,
165 ttl_seconds=effective_ttl,
166 last_accessed=time.time(),
167 )
169 self._stats.size = len(self._store)
171 # Evict oldest if over capacity
172 while len(self._store) > self._max_entries:
173 oldest_key, _ = self._store.popitem(last=False)
174 self._stats.evictions += 1
176 return key
178 def invalidate(self, prompt: str, **context: Any) -> bool:
179 """Remove a specific cache entry. Returns True if found and removed."""
180 key = self._make_key(prompt, context)
181 if key in self._store:
182 del self._store[key]
183 self._stats.size = len(self._store)
184 return True
185 return False
187 def clear(self) -> None:
188 """Clear all cached entries."""
189 self._store.clear()
190 self._stats.size = 0
192 def clear_expired(self) -> int:
193 """Remove all expired entries. Returns count removed."""
194 expired = [k for k, e in self._store.items() if e.is_expired]
195 for k in expired:
196 del self._store[k]
197 self._stats.expirations += len(expired)
198 self._stats.size = len(self._store)
199 return len(expired)
201 def get_stats(self) -> CacheStats:
202 """Return current cache statistics snapshot."""
203 self._stats.size = len(self._store)
204 return self._stats
206 def get_entry(self, prompt: str, **context: Any) -> Optional[CacheEntry]:
207 """Get the full cache entry (including metadata) without updating LRU."""
208 key = self._make_key(prompt, context)
209 return self._store.get(key)
211 def _evict(self, key: str) -> None:
212 """Evict a specific entry."""
213 if key in self._store:
214 del self._store[key]
215 self._stats.evictions += 1
216 self._stats.size = len(self._store)
218 def _make_key(self, prompt: str, context: dict[str, Any]) -> str:
219 """Generate a cache key based on the configured strategy."""
220 if self._key_strategy == CacheKeyStrategy.NORMALIZED:
221 prompt = " ".join(prompt.lower().split())
223 if self._key_strategy == CacheKeyStrategy.TEMPLATE:
224 key_data = json.dumps({"template": prompt, "vars": context}, sort_keys=True)
225 return hashlib.sha256(key_data.encode()).hexdigest()[:32]
227 if context:
228 prompt = prompt + json.dumps(context, sort_keys=True)
230 return hashlib.sha256(prompt.encode()).hexdigest()[:32]
232 @property
233 def size(self) -> int:
234 return len(self._store)
236 @property
237 def is_full(self) -> bool:
238 return len(self._store) >= self._max_entries
240 def __contains__(self, prompt: str) -> bool:
241 key = self._make_key(prompt, {})
242 entry = self._store.get(key)
243 return entry is not None and not entry.is_expired
245 def __len__(self) -> int:
246 return len(self._store)