Coverage for sentimatrix / core / cache.py: 84%
307 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-28 09:30 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-01-28 09:30 +0000
1"""
2Sentimatrix Cache Module
4Provides caching functionality with multiple backends:
5- Memory cache (default)
6- Redis cache (distributed)
7- SQLite cache (persistent)
9Example:
10 >>> cache = CacheManager(CacheConfig(backend="memory"))
11 >>> await cache.set("key", "value", ttl=3600)
12 >>> value = await cache.get("key")
13"""
15from __future__ import annotations
17import asyncio
18import hashlib
19import json
20import pickle
21import time
22import zlib
23from abc import ABC, abstractmethod
24from collections import OrderedDict
25from dataclasses import dataclass, field
26from datetime import datetime
27from typing import Any, Dict, Generic, List, Optional, TypeVar, Union
29from sentimatrix.core.config import CacheBackend, CacheConfig
30from sentimatrix.core.exceptions import (
31 CacheConnectionError,
32 CacheReadError,
33 CacheSerializationError,
34 CacheWriteError,
35)
37T = TypeVar("T")
40@dataclass
41class CacheEntry(Generic[T]):
42 """Represents a cached entry with metadata."""
44 key: str
45 value: T
46 created_at: float = field(default_factory=time.time)
47 expires_at: Optional[float] = None
48 access_count: int = 0
49 last_accessed: float = field(default_factory=time.time)
51 @property
52 def is_expired(self) -> bool:
53 """Check if entry is expired."""
54 if self.expires_at is None:
55 return False
56 return time.time() > self.expires_at
58 @property
59 def ttl_remaining(self) -> Optional[float]:
60 """Get remaining TTL in seconds."""
61 if self.expires_at is None:
62 return None
63 remaining = self.expires_at - time.time()
64 return max(0, remaining)
66 def touch(self) -> None:
67 """Update access metadata."""
68 self.access_count += 1
69 self.last_accessed = time.time()
72@dataclass
73class CacheStats:
74 """Cache statistics."""
76 hits: int = 0
77 misses: int = 0
78 sets: int = 0
79 deletes: int = 0
80 evictions: int = 0
81 total_entries: int = 0
82 total_size_bytes: int = 0
84 @property
85 def hit_rate(self) -> float:
86 """Calculate cache hit rate."""
87 total = self.hits + self.misses
88 return self.hits / total if total > 0 else 0.0
90 def to_dict(self) -> Dict[str, Any]:
91 """Convert to dictionary."""
92 return {
93 "hits": self.hits,
94 "misses": self.misses,
95 "sets": self.sets,
96 "deletes": self.deletes,
97 "evictions": self.evictions,
98 "hit_rate": round(self.hit_rate, 4),
99 "total_entries": self.total_entries,
100 "total_size_bytes": self.total_size_bytes,
101 }
104class BaseCacheBackend(ABC):
105 """Abstract base class for cache backends."""
107 @abstractmethod
108 async def get(self, key: str) -> Optional[Any]:
109 """Get value by key."""
110 pass
112 @abstractmethod
113 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
114 """Set key-value pair with optional TTL."""
115 pass
117 @abstractmethod
118 async def delete(self, key: str) -> bool:
119 """Delete key. Returns True if key existed."""
120 pass
122 @abstractmethod
123 async def exists(self, key: str) -> bool:
124 """Check if key exists."""
125 pass
127 @abstractmethod
128 async def clear(self, namespace: Optional[str] = None) -> int:
129 """Clear cache. Returns number of entries cleared."""
130 pass
132 @abstractmethod
133 async def get_stats(self) -> CacheStats:
134 """Get cache statistics."""
135 pass
137 @abstractmethod
138 async def close(self) -> None:
139 """Close cache connection/cleanup resources."""
140 pass
143class MemoryCache(BaseCacheBackend):
144 """
145 In-memory cache with LRU eviction and TTL support.
147 Thread-safe using asyncio locks.
148 """
150 def __init__(
151 self,
152 max_size: int = 1000,
153 default_ttl: Optional[int] = 3600,
154 compression: bool = False,
155 ) -> None:
156 """
157 Initialize memory cache.
159 Args:
160 max_size: Maximum number of entries
161 default_ttl: Default TTL in seconds (None for no expiry)
162 compression: Enable value compression
163 """
164 self._max_size = max_size
165 self._default_ttl = default_ttl
166 self._compression = compression
167 self._cache: OrderedDict[str, CacheEntry] = OrderedDict()
168 self._lock = asyncio.Lock()
169 self._stats = CacheStats()
171 async def get(self, key: str) -> Optional[Any]:
172 """Get value by key, returns None if not found or expired."""
173 async with self._lock:
174 entry = self._cache.get(key)
176 if entry is None:
177 self._stats.misses += 1
178 return None
180 if entry.is_expired:
181 del self._cache[key]
182 self._stats.misses += 1
183 self._stats.evictions += 1
184 return None
186 # Move to end (most recently used)
187 self._cache.move_to_end(key)
188 entry.touch()
189 self._stats.hits += 1
191 value = entry.value
192 if self._compression and isinstance(value, bytes):
193 try:
194 value = pickle.loads(zlib.decompress(value))
195 except Exception as e:
196 raise CacheSerializationError(
197 "memory", "decompress", str(e)
198 ) from e
200 return value
202 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
203 """Set key-value pair with optional TTL."""
204 async with self._lock:
205 # Compress if enabled
206 stored_value = value
207 if self._compression:
208 try:
209 stored_value = zlib.compress(pickle.dumps(value))
210 except Exception as e:
211 raise CacheSerializationError(
212 "memory", "compress", str(e)
213 ) from e
215 # Calculate expiry time
216 effective_ttl = ttl if ttl is not None else self._default_ttl
217 expires_at = time.time() + effective_ttl if effective_ttl else None
219 # Create or update entry
220 entry = CacheEntry(
221 key=key,
222 value=stored_value,
223 expires_at=expires_at,
224 )
226 # Evict if at capacity and adding new key
227 if key not in self._cache and len(self._cache) >= self._max_size:
228 await self._evict_lru()
230 self._cache[key] = entry
231 self._cache.move_to_end(key)
232 self._stats.sets += 1
233 self._stats.total_entries = len(self._cache)
235 async def delete(self, key: str) -> bool:
236 """Delete key. Returns True if key existed."""
237 async with self._lock:
238 if key in self._cache:
239 del self._cache[key]
240 self._stats.deletes += 1
241 self._stats.total_entries = len(self._cache)
242 return True
243 return False
245 async def exists(self, key: str) -> bool:
246 """Check if key exists and is not expired."""
247 async with self._lock:
248 entry = self._cache.get(key)
249 if entry is None:
250 return False
251 if entry.is_expired: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 del self._cache[key]
253 self._stats.evictions += 1
254 return False
255 return True
257 async def clear(self, namespace: Optional[str] = None) -> int:
258 """Clear cache, optionally filtered by namespace prefix."""
259 async with self._lock:
260 if namespace is None:
261 count = len(self._cache)
262 self._cache.clear()
263 else:
264 prefix = f"{namespace}:"
265 keys_to_delete = [k for k in self._cache if k.startswith(prefix)]
266 count = len(keys_to_delete)
267 for key in keys_to_delete:
268 del self._cache[key]
270 self._stats.total_entries = len(self._cache)
271 return count
273 async def get_stats(self) -> CacheStats:
274 """Get cache statistics."""
275 async with self._lock:
276 self._stats.total_entries = len(self._cache)
278 # Calculate approximate size
279 total_size = 0
280 for entry in self._cache.values():
281 if isinstance(entry.value, bytes): 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 total_size += len(entry.value)
283 else:
284 try:
285 total_size += len(pickle.dumps(entry.value))
286 except Exception:
287 pass
289 self._stats.total_size_bytes = total_size
290 return self._stats
292 async def close(self) -> None:
293 """Close cache (no-op for memory cache)."""
294 pass
296 async def _evict_lru(self) -> None:
297 """Evict least recently used entry."""
298 if self._cache: 298 ↛ exitline 298 didn't return from function '_evict_lru' because the condition on line 298 was always true
299 # First try to evict expired entries
300 expired_keys = [
301 k for k, v in self._cache.items() if v.is_expired
302 ]
303 for key in expired_keys[:max(1, len(expired_keys) // 2)]: 303 ↛ 304line 303 didn't jump to line 304 because the loop on line 303 never started
304 del self._cache[key]
305 self._stats.evictions += 1
307 # If still at capacity, evict LRU
308 while len(self._cache) >= self._max_size:
309 self._cache.popitem(last=False)
310 self._stats.evictions += 1
312 async def get_many(self, keys: List[str]) -> Dict[str, Any]:
313 """Get multiple values by keys."""
314 results = {}
315 for key in keys:
316 value = await self.get(key)
317 if value is not None:
318 results[key] = value
319 return results
321 async def set_many(
322 self, items: Dict[str, Any], ttl: Optional[int] = None
323 ) -> None:
324 """Set multiple key-value pairs."""
325 for key, value in items.items():
326 await self.set(key, value, ttl)
328 async def delete_many(self, keys: List[str]) -> int:
329 """Delete multiple keys. Returns count of deleted keys."""
330 deleted = 0
331 for key in keys:
332 if await self.delete(key):
333 deleted += 1
334 return deleted
337class CacheManager:
338 """
339 High-level cache manager with namespace support and key hashing.
341 Provides a unified interface for all cache backends.
342 """
344 def __init__(self, config: Optional[CacheConfig] = None) -> None:
345 """
346 Initialize cache manager.
348 Args:
349 config: Cache configuration
350 """
351 self._config = config or CacheConfig()
352 self._backend: Optional[BaseCacheBackend] = None
353 self._initialized = False
355 async def initialize(self) -> None:
356 """Initialize cache backend based on configuration."""
357 if self._initialized: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 return
360 if not self._config.enabled:
361 self._backend = None
362 self._initialized = True
363 return
365 if self._config.backend == CacheBackend.MEMORY: 365 ↛ 371line 365 didn't jump to line 371 because the condition on line 365 was always true
366 self._backend = MemoryCache(
367 max_size=self._config.max_size,
368 default_ttl=self._config.ttl if self._config.ttl > 0 else None,
369 compression=self._config.compression,
370 )
371 elif self._config.backend == CacheBackend.REDIS:
372 # Redis backend will be implemented later
373 raise NotImplementedError("Redis backend not yet implemented")
374 elif self._config.backend == CacheBackend.SQLITE:
375 # SQLite backend will be implemented later
376 raise NotImplementedError("SQLite backend not yet implemented")
377 else:
378 raise ValueError(f"Unknown cache backend: {self._config.backend}")
380 self._initialized = True
382 async def _ensure_initialized(self) -> None:
383 """Ensure cache is initialized."""
384 if not self._initialized: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true
385 await self.initialize()
387 def _make_key(self, key: str) -> str:
388 """Create namespaced cache key."""
389 return f"{self._config.namespace}:{key}"
391 @staticmethod
392 def hash_key(key: str) -> str:
393 """
394 Create a hash of the key for use as cache key.
396 Useful for long or complex keys.
397 """
398 return hashlib.sha256(key.encode()).hexdigest()[:32]
400 async def get(self, key: str, default: Any = None) -> Any:
401 """
402 Get value by key.
404 Args:
405 key: Cache key
406 default: Default value if not found
408 Returns:
409 Cached value or default
410 """
411 await self._ensure_initialized()
413 if self._backend is None:
414 return default
416 try:
417 full_key = self._make_key(key)
418 value = await self._backend.get(full_key)
419 return value if value is not None else default
420 except Exception as e:
421 raise CacheReadError("memory", key, str(e)) from e
423 async def set(
424 self, key: str, value: Any, ttl: Optional[int] = None
425 ) -> None:
426 """
427 Set key-value pair.
429 Args:
430 key: Cache key
431 value: Value to cache
432 ttl: Time-to-live in seconds (None uses default)
433 """
434 await self._ensure_initialized()
436 if self._backend is None:
437 return
439 try:
440 full_key = self._make_key(key)
441 await self._backend.set(full_key, value, ttl)
442 except Exception as e:
443 raise CacheWriteError("memory", key, str(e)) from e
445 async def delete(self, key: str) -> bool:
446 """
447 Delete key from cache.
449 Args:
450 key: Cache key
452 Returns:
453 True if key was deleted, False if not found
454 """
455 await self._ensure_initialized()
457 if self._backend is None:
458 return False
460 full_key = self._make_key(key)
461 return await self._backend.delete(full_key)
463 async def exists(self, key: str) -> bool:
464 """
465 Check if key exists in cache.
467 Args:
468 key: Cache key
470 Returns:
471 True if key exists and is not expired
472 """
473 await self._ensure_initialized()
475 if self._backend is None:
476 return False
478 full_key = self._make_key(key)
479 return await self._backend.exists(full_key)
481 async def clear(self, namespace: Optional[str] = None) -> int:
482 """
483 Clear cache entries.
485 Args:
486 namespace: Optional namespace prefix to filter (uses configured namespace if None)
488 Returns:
489 Number of entries cleared
490 """
491 await self._ensure_initialized()
493 if self._backend is None:
494 return 0
496 ns = namespace or self._config.namespace
497 return await self._backend.clear(ns)
499 async def get_or_set(
500 self,
501 key: str,
502 default_factory: Any,
503 ttl: Optional[int] = None,
504 ) -> Any:
505 """
506 Get value or set if not exists.
508 Args:
509 key: Cache key
510 default_factory: Callable that returns value to cache if not found
511 ttl: Time-to-live in seconds
513 Returns:
514 Cached or newly set value
515 """
516 value = await self.get(key)
517 if value is not None:
518 return value
520 # Generate new value
521 if callable(default_factory): 521 ↛ 527line 521 didn't jump to line 527 because the condition on line 521 was always true
522 if asyncio.iscoroutinefunction(default_factory):
523 value = await default_factory()
524 else:
525 value = default_factory()
526 else:
527 value = default_factory
529 await self.set(key, value, ttl)
530 return value
532 async def get_stats(self) -> Optional[CacheStats]:
533 """
534 Get cache statistics.
536 Returns:
537 CacheStats object or None if cache is disabled
538 """
539 await self._ensure_initialized()
541 if self._backend is None: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true
542 return None
544 return await self._backend.get_stats()
546 async def close(self) -> None:
547 """Close cache and cleanup resources."""
548 if self._backend is not None:
549 await self._backend.close()
550 self._initialized = False
552 @property
553 def enabled(self) -> bool:
554 """Check if cache is enabled."""
555 return self._config.enabled
557 @property
558 def backend_type(self) -> CacheBackend:
559 """Get cache backend type."""
560 return self._config.backend
562 async def __aenter__(self) -> "CacheManager":
563 """Async context manager entry."""
564 await self.initialize()
565 return self
567 async def __aexit__(self, *args: Any) -> None:
568 """Async context manager exit."""
569 await self.close()
572def cache_key(*args: Any, **kwargs: Any) -> str:
573 """
574 Generate a cache key from arguments.
576 Useful for function memoization.
578 Example:
579 >>> key = cache_key("analyze", url="https://example.com", limit=100)
580 >>> # Returns deterministic hash-based key
581 """
582 # Serialize args and kwargs to string
583 key_parts = [str(arg) for arg in args]
584 key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
585 key_string = ":".join(key_parts)
587 # Hash for consistent length
588 return hashlib.sha256(key_string.encode()).hexdigest()[:32]
591def cached(
592 ttl: Optional[int] = 3600,
593 key_prefix: str = "",
594 cache_none: bool = False,
595):
596 """
597 Decorator for caching function results.
599 Args:
600 ttl: Time-to-live in seconds
601 key_prefix: Prefix for cache keys
602 cache_none: Whether to cache None results
604 Example:
605 >>> @cached(ttl=3600, key_prefix="sentiment")
606 ... async def analyze_sentiment(text: str) -> dict:
607 ... # Expensive operation
608 ... return result
609 """
610 def decorator(func):
611 async def wrapper(*args, **kwargs):
612 # Get or create cache manager
613 cache_manager = getattr(wrapper, "_cache_manager", None)
614 if cache_manager is None:
615 wrapper._cache_manager = CacheManager()
616 await wrapper._cache_manager.initialize()
617 cache_manager = wrapper._cache_manager
619 # Generate cache key
620 key = f"{key_prefix}:{func.__name__}:{cache_key(*args, **kwargs)}"
622 # Try to get from cache
623 cached_value = await cache_manager.get(key)
624 if cached_value is not None:
625 return cached_value
627 # Call function
628 result = await func(*args, **kwargs)
630 # Cache result
631 if result is not None or cache_none: 631 ↛ 634line 631 didn't jump to line 634 because the condition on line 631 was always true
632 await cache_manager.set(key, result, ttl)
634 return result
636 return wrapper
638 return decorator