Coverage for sentimatrix / core / cache.py: 84%

307 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-28 09:30 +0000

1""" 

2Sentimatrix Cache Module 

3 

4Provides caching functionality with multiple backends: 

5- Memory cache (default) 

6- Redis cache (distributed) 

7- SQLite cache (persistent) 

8 

9Example: 

10 >>> cache = CacheManager(CacheConfig(backend="memory")) 

11 >>> await cache.set("key", "value", ttl=3600) 

12 >>> value = await cache.get("key") 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import hashlib 

19import json 

20import pickle 

21import time 

22import zlib 

23from abc import ABC, abstractmethod 

24from collections import OrderedDict 

25from dataclasses import dataclass, field 

26from datetime import datetime 

27from typing import Any, Dict, Generic, List, Optional, TypeVar, Union 

28 

29from sentimatrix.core.config import CacheBackend, CacheConfig 

30from sentimatrix.core.exceptions import ( 

31 CacheConnectionError, 

32 CacheReadError, 

33 CacheSerializationError, 

34 CacheWriteError, 

35) 

36 

37T = TypeVar("T") 

38 

39 

40@dataclass 

41class CacheEntry(Generic[T]): 

42 """Represents a cached entry with metadata.""" 

43 

44 key: str 

45 value: T 

46 created_at: float = field(default_factory=time.time) 

47 expires_at: Optional[float] = None 

48 access_count: int = 0 

49 last_accessed: float = field(default_factory=time.time) 

50 

51 @property 

52 def is_expired(self) -> bool: 

53 """Check if entry is expired.""" 

54 if self.expires_at is None: 

55 return False 

56 return time.time() > self.expires_at 

57 

58 @property 

59 def ttl_remaining(self) -> Optional[float]: 

60 """Get remaining TTL in seconds.""" 

61 if self.expires_at is None: 

62 return None 

63 remaining = self.expires_at - time.time() 

64 return max(0, remaining) 

65 

66 def touch(self) -> None: 

67 """Update access metadata.""" 

68 self.access_count += 1 

69 self.last_accessed = time.time() 

70 

71 

72@dataclass 

73class CacheStats: 

74 """Cache statistics.""" 

75 

76 hits: int = 0 

77 misses: int = 0 

78 sets: int = 0 

79 deletes: int = 0 

80 evictions: int = 0 

81 total_entries: int = 0 

82 total_size_bytes: int = 0 

83 

84 @property 

85 def hit_rate(self) -> float: 

86 """Calculate cache hit rate.""" 

87 total = self.hits + self.misses 

88 return self.hits / total if total > 0 else 0.0 

89 

90 def to_dict(self) -> Dict[str, Any]: 

91 """Convert to dictionary.""" 

92 return { 

93 "hits": self.hits, 

94 "misses": self.misses, 

95 "sets": self.sets, 

96 "deletes": self.deletes, 

97 "evictions": self.evictions, 

98 "hit_rate": round(self.hit_rate, 4), 

99 "total_entries": self.total_entries, 

100 "total_size_bytes": self.total_size_bytes, 

101 } 

102 

103 

104class BaseCacheBackend(ABC): 

105 """Abstract base class for cache backends.""" 

106 

107 @abstractmethod 

108 async def get(self, key: str) -> Optional[Any]: 

109 """Get value by key.""" 

110 pass 

111 

112 @abstractmethod 

113 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 

114 """Set key-value pair with optional TTL.""" 

115 pass 

116 

117 @abstractmethod 

118 async def delete(self, key: str) -> bool: 

119 """Delete key. Returns True if key existed.""" 

120 pass 

121 

122 @abstractmethod 

123 async def exists(self, key: str) -> bool: 

124 """Check if key exists.""" 

125 pass 

126 

127 @abstractmethod 

128 async def clear(self, namespace: Optional[str] = None) -> int: 

129 """Clear cache. Returns number of entries cleared.""" 

130 pass 

131 

132 @abstractmethod 

133 async def get_stats(self) -> CacheStats: 

134 """Get cache statistics.""" 

135 pass 

136 

137 @abstractmethod 

138 async def close(self) -> None: 

139 """Close cache connection/cleanup resources.""" 

140 pass 

141 

142 

143class MemoryCache(BaseCacheBackend): 

144 """ 

145 In-memory cache with LRU eviction and TTL support. 

146 

147 Thread-safe using asyncio locks. 

148 """ 

149 

150 def __init__( 

151 self, 

152 max_size: int = 1000, 

153 default_ttl: Optional[int] = 3600, 

154 compression: bool = False, 

155 ) -> None: 

156 """ 

157 Initialize memory cache. 

158 

159 Args: 

160 max_size: Maximum number of entries 

161 default_ttl: Default TTL in seconds (None for no expiry) 

162 compression: Enable value compression 

163 """ 

164 self._max_size = max_size 

165 self._default_ttl = default_ttl 

166 self._compression = compression 

167 self._cache: OrderedDict[str, CacheEntry] = OrderedDict() 

168 self._lock = asyncio.Lock() 

169 self._stats = CacheStats() 

170 

171 async def get(self, key: str) -> Optional[Any]: 

172 """Get value by key, returns None if not found or expired.""" 

173 async with self._lock: 

174 entry = self._cache.get(key) 

175 

176 if entry is None: 

177 self._stats.misses += 1 

178 return None 

179 

180 if entry.is_expired: 

181 del self._cache[key] 

182 self._stats.misses += 1 

183 self._stats.evictions += 1 

184 return None 

185 

186 # Move to end (most recently used) 

187 self._cache.move_to_end(key) 

188 entry.touch() 

189 self._stats.hits += 1 

190 

191 value = entry.value 

192 if self._compression and isinstance(value, bytes): 

193 try: 

194 value = pickle.loads(zlib.decompress(value)) 

195 except Exception as e: 

196 raise CacheSerializationError( 

197 "memory", "decompress", str(e) 

198 ) from e 

199 

200 return value 

201 

202 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 

203 """Set key-value pair with optional TTL.""" 

204 async with self._lock: 

205 # Compress if enabled 

206 stored_value = value 

207 if self._compression: 

208 try: 

209 stored_value = zlib.compress(pickle.dumps(value)) 

210 except Exception as e: 

211 raise CacheSerializationError( 

212 "memory", "compress", str(e) 

213 ) from e 

214 

215 # Calculate expiry time 

216 effective_ttl = ttl if ttl is not None else self._default_ttl 

217 expires_at = time.time() + effective_ttl if effective_ttl else None 

218 

219 # Create or update entry 

220 entry = CacheEntry( 

221 key=key, 

222 value=stored_value, 

223 expires_at=expires_at, 

224 ) 

225 

226 # Evict if at capacity and adding new key 

227 if key not in self._cache and len(self._cache) >= self._max_size: 

228 await self._evict_lru() 

229 

230 self._cache[key] = entry 

231 self._cache.move_to_end(key) 

232 self._stats.sets += 1 

233 self._stats.total_entries = len(self._cache) 

234 

235 async def delete(self, key: str) -> bool: 

236 """Delete key. Returns True if key existed.""" 

237 async with self._lock: 

238 if key in self._cache: 

239 del self._cache[key] 

240 self._stats.deletes += 1 

241 self._stats.total_entries = len(self._cache) 

242 return True 

243 return False 

244 

245 async def exists(self, key: str) -> bool: 

246 """Check if key exists and is not expired.""" 

247 async with self._lock: 

248 entry = self._cache.get(key) 

249 if entry is None: 

250 return False 

251 if entry.is_expired: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 del self._cache[key] 

253 self._stats.evictions += 1 

254 return False 

255 return True 

256 

257 async def clear(self, namespace: Optional[str] = None) -> int: 

258 """Clear cache, optionally filtered by namespace prefix.""" 

259 async with self._lock: 

260 if namespace is None: 

261 count = len(self._cache) 

262 self._cache.clear() 

263 else: 

264 prefix = f"{namespace}:" 

265 keys_to_delete = [k for k in self._cache if k.startswith(prefix)] 

266 count = len(keys_to_delete) 

267 for key in keys_to_delete: 

268 del self._cache[key] 

269 

270 self._stats.total_entries = len(self._cache) 

271 return count 

272 

273 async def get_stats(self) -> CacheStats: 

274 """Get cache statistics.""" 

275 async with self._lock: 

276 self._stats.total_entries = len(self._cache) 

277 

278 # Calculate approximate size 

279 total_size = 0 

280 for entry in self._cache.values(): 

281 if isinstance(entry.value, bytes): 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 total_size += len(entry.value) 

283 else: 

284 try: 

285 total_size += len(pickle.dumps(entry.value)) 

286 except Exception: 

287 pass 

288 

289 self._stats.total_size_bytes = total_size 

290 return self._stats 

291 

292 async def close(self) -> None: 

293 """Close cache (no-op for memory cache).""" 

294 pass 

295 

296 async def _evict_lru(self) -> None: 

297 """Evict least recently used entry.""" 

298 if self._cache: 298 ↛ exitline 298 didn't return from function '_evict_lru' because the condition on line 298 was always true

299 # First try to evict expired entries 

300 expired_keys = [ 

301 k for k, v in self._cache.items() if v.is_expired 

302 ] 

303 for key in expired_keys[:max(1, len(expired_keys) // 2)]: 303 ↛ 304line 303 didn't jump to line 304 because the loop on line 303 never started

304 del self._cache[key] 

305 self._stats.evictions += 1 

306 

307 # If still at capacity, evict LRU 

308 while len(self._cache) >= self._max_size: 

309 self._cache.popitem(last=False) 

310 self._stats.evictions += 1 

311 

312 async def get_many(self, keys: List[str]) -> Dict[str, Any]: 

313 """Get multiple values by keys.""" 

314 results = {} 

315 for key in keys: 

316 value = await self.get(key) 

317 if value is not None: 

318 results[key] = value 

319 return results 

320 

321 async def set_many( 

322 self, items: Dict[str, Any], ttl: Optional[int] = None 

323 ) -> None: 

324 """Set multiple key-value pairs.""" 

325 for key, value in items.items(): 

326 await self.set(key, value, ttl) 

327 

328 async def delete_many(self, keys: List[str]) -> int: 

329 """Delete multiple keys. Returns count of deleted keys.""" 

330 deleted = 0 

331 for key in keys: 

332 if await self.delete(key): 

333 deleted += 1 

334 return deleted 

335 

336 

337class CacheManager: 

338 """ 

339 High-level cache manager with namespace support and key hashing. 

340 

341 Provides a unified interface for all cache backends. 

342 """ 

343 

344 def __init__(self, config: Optional[CacheConfig] = None) -> None: 

345 """ 

346 Initialize cache manager. 

347 

348 Args: 

349 config: Cache configuration 

350 """ 

351 self._config = config or CacheConfig() 

352 self._backend: Optional[BaseCacheBackend] = None 

353 self._initialized = False 

354 

355 async def initialize(self) -> None: 

356 """Initialize cache backend based on configuration.""" 

357 if self._initialized: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 return 

359 

360 if not self._config.enabled: 

361 self._backend = None 

362 self._initialized = True 

363 return 

364 

365 if self._config.backend == CacheBackend.MEMORY: 365 ↛ 371line 365 didn't jump to line 371 because the condition on line 365 was always true

366 self._backend = MemoryCache( 

367 max_size=self._config.max_size, 

368 default_ttl=self._config.ttl if self._config.ttl > 0 else None, 

369 compression=self._config.compression, 

370 ) 

371 elif self._config.backend == CacheBackend.REDIS: 

372 # Redis backend will be implemented later 

373 raise NotImplementedError("Redis backend not yet implemented") 

374 elif self._config.backend == CacheBackend.SQLITE: 

375 # SQLite backend will be implemented later 

376 raise NotImplementedError("SQLite backend not yet implemented") 

377 else: 

378 raise ValueError(f"Unknown cache backend: {self._config.backend}") 

379 

380 self._initialized = True 

381 

382 async def _ensure_initialized(self) -> None: 

383 """Ensure cache is initialized.""" 

384 if not self._initialized: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 await self.initialize() 

386 

387 def _make_key(self, key: str) -> str: 

388 """Create namespaced cache key.""" 

389 return f"{self._config.namespace}:{key}" 

390 

391 @staticmethod 

392 def hash_key(key: str) -> str: 

393 """ 

394 Create a hash of the key for use as cache key. 

395 

396 Useful for long or complex keys. 

397 """ 

398 return hashlib.sha256(key.encode()).hexdigest()[:32] 

399 

400 async def get(self, key: str, default: Any = None) -> Any: 

401 """ 

402 Get value by key. 

403 

404 Args: 

405 key: Cache key 

406 default: Default value if not found 

407 

408 Returns: 

409 Cached value or default 

410 """ 

411 await self._ensure_initialized() 

412 

413 if self._backend is None: 

414 return default 

415 

416 try: 

417 full_key = self._make_key(key) 

418 value = await self._backend.get(full_key) 

419 return value if value is not None else default 

420 except Exception as e: 

421 raise CacheReadError("memory", key, str(e)) from e 

422 

423 async def set( 

424 self, key: str, value: Any, ttl: Optional[int] = None 

425 ) -> None: 

426 """ 

427 Set key-value pair. 

428 

429 Args: 

430 key: Cache key 

431 value: Value to cache 

432 ttl: Time-to-live in seconds (None uses default) 

433 """ 

434 await self._ensure_initialized() 

435 

436 if self._backend is None: 

437 return 

438 

439 try: 

440 full_key = self._make_key(key) 

441 await self._backend.set(full_key, value, ttl) 

442 except Exception as e: 

443 raise CacheWriteError("memory", key, str(e)) from e 

444 

445 async def delete(self, key: str) -> bool: 

446 """ 

447 Delete key from cache. 

448 

449 Args: 

450 key: Cache key 

451 

452 Returns: 

453 True if key was deleted, False if not found 

454 """ 

455 await self._ensure_initialized() 

456 

457 if self._backend is None: 

458 return False 

459 

460 full_key = self._make_key(key) 

461 return await self._backend.delete(full_key) 

462 

463 async def exists(self, key: str) -> bool: 

464 """ 

465 Check if key exists in cache. 

466 

467 Args: 

468 key: Cache key 

469 

470 Returns: 

471 True if key exists and is not expired 

472 """ 

473 await self._ensure_initialized() 

474 

475 if self._backend is None: 

476 return False 

477 

478 full_key = self._make_key(key) 

479 return await self._backend.exists(full_key) 

480 

481 async def clear(self, namespace: Optional[str] = None) -> int: 

482 """ 

483 Clear cache entries. 

484 

485 Args: 

486 namespace: Optional namespace prefix to filter (uses configured namespace if None) 

487 

488 Returns: 

489 Number of entries cleared 

490 """ 

491 await self._ensure_initialized() 

492 

493 if self._backend is None: 

494 return 0 

495 

496 ns = namespace or self._config.namespace 

497 return await self._backend.clear(ns) 

498 

499 async def get_or_set( 

500 self, 

501 key: str, 

502 default_factory: Any, 

503 ttl: Optional[int] = None, 

504 ) -> Any: 

505 """ 

506 Get value or set if not exists. 

507 

508 Args: 

509 key: Cache key 

510 default_factory: Callable that returns value to cache if not found 

511 ttl: Time-to-live in seconds 

512 

513 Returns: 

514 Cached or newly set value 

515 """ 

516 value = await self.get(key) 

517 if value is not None: 

518 return value 

519 

520 # Generate new value 

521 if callable(default_factory): 521 ↛ 527line 521 didn't jump to line 527 because the condition on line 521 was always true

522 if asyncio.iscoroutinefunction(default_factory): 

523 value = await default_factory() 

524 else: 

525 value = default_factory() 

526 else: 

527 value = default_factory 

528 

529 await self.set(key, value, ttl) 

530 return value 

531 

532 async def get_stats(self) -> Optional[CacheStats]: 

533 """ 

534 Get cache statistics. 

535 

536 Returns: 

537 CacheStats object or None if cache is disabled 

538 """ 

539 await self._ensure_initialized() 

540 

541 if self._backend is None: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true

542 return None 

543 

544 return await self._backend.get_stats() 

545 

546 async def close(self) -> None: 

547 """Close cache and cleanup resources.""" 

548 if self._backend is not None: 

549 await self._backend.close() 

550 self._initialized = False 

551 

552 @property 

553 def enabled(self) -> bool: 

554 """Check if cache is enabled.""" 

555 return self._config.enabled 

556 

557 @property 

558 def backend_type(self) -> CacheBackend: 

559 """Get cache backend type.""" 

560 return self._config.backend 

561 

562 async def __aenter__(self) -> "CacheManager": 

563 """Async context manager entry.""" 

564 await self.initialize() 

565 return self 

566 

567 async def __aexit__(self, *args: Any) -> None: 

568 """Async context manager exit.""" 

569 await self.close() 

570 

571 

572def cache_key(*args: Any, **kwargs: Any) -> str: 

573 """ 

574 Generate a cache key from arguments. 

575 

576 Useful for function memoization. 

577 

578 Example: 

579 >>> key = cache_key("analyze", url="https://example.com", limit=100) 

580 >>> # Returns deterministic hash-based key 

581 """ 

582 # Serialize args and kwargs to string 

583 key_parts = [str(arg) for arg in args] 

584 key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items())) 

585 key_string = ":".join(key_parts) 

586 

587 # Hash for consistent length 

588 return hashlib.sha256(key_string.encode()).hexdigest()[:32] 

589 

590 

591def cached( 

592 ttl: Optional[int] = 3600, 

593 key_prefix: str = "", 

594 cache_none: bool = False, 

595): 

596 """ 

597 Decorator for caching function results. 

598 

599 Args: 

600 ttl: Time-to-live in seconds 

601 key_prefix: Prefix for cache keys 

602 cache_none: Whether to cache None results 

603 

604 Example: 

605 >>> @cached(ttl=3600, key_prefix="sentiment") 

606 ... async def analyze_sentiment(text: str) -> dict: 

607 ... # Expensive operation 

608 ... return result 

609 """ 

610 def decorator(func): 

611 async def wrapper(*args, **kwargs): 

612 # Get or create cache manager 

613 cache_manager = getattr(wrapper, "_cache_manager", None) 

614 if cache_manager is None: 

615 wrapper._cache_manager = CacheManager() 

616 await wrapper._cache_manager.initialize() 

617 cache_manager = wrapper._cache_manager 

618 

619 # Generate cache key 

620 key = f"{key_prefix}:{func.__name__}:{cache_key(*args, **kwargs)}" 

621 

622 # Try to get from cache 

623 cached_value = await cache_manager.get(key) 

624 if cached_value is not None: 

625 return cached_value 

626 

627 # Call function 

628 result = await func(*args, **kwargs) 

629 

630 # Cache result 

631 if result is not None or cache_none: 631 ↛ 634line 631 didn't jump to line 634 because the condition on line 631 was always true

632 await cache_manager.set(key, result, ttl) 

633 

634 return result 

635 

636 return wrapper 

637 

638 return decorator