Coverage for session_buddy / acb_cache_adapter.py: 72.25%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Async-native, ACB-backed cache adapters for session-mgmt-mcp. 

2 

3This module provides fully asynchronous cache adapters using aiocache, 

4leveraging ACB's underlying cache for optimized serialization and 

5lifecycle management. 

6""" 

7 

8import hashlib 

9import typing as t 

10from contextlib import suppress 

11from dataclasses import dataclass 

12 

13if t.TYPE_CHECKING: 

14 from aiocache import SimpleMemoryCache 

15 from aiocache.serializers import PickleSerializer 

16 from session_buddy.adapters.settings import CacheAdapterSettings 

17 

18try: 

19 from aiocache import SimpleMemoryCache 

20 from aiocache.serializers import PickleSerializer 

21 

22 AIOCACHE_AVAILABLE = True 

23except ImportError: 

24 AIOCACHE_AVAILABLE = False 

25 # Type stubs for when aiocache is not installed 

26 SimpleMemoryCache: t.Any = object # type: ignore[no-redef] 

27 PickleSerializer: t.Any = object # type: ignore[no-redef] 

28 

29 

30@dataclass 

31class CacheStats: 

32 """Cache statistics for monitoring.""" 

33 

34 hits: int = 0 

35 misses: int = 0 

36 evictions: int = 0 

37 total_entries: int = 0 

38 

39 @property 

40 def hit_rate(self) -> float: 

41 """Calculate cache hit rate percentage.""" 

42 total = self.hits + self.misses 

43 return (self.hits / total * 100) if total > 0 else 0.0 

44 

45 def to_dict(self) -> dict[str, t.Any]: 

46 """Convert stats to dictionary for reporting.""" 

47 return { 

48 "hits": self.hits, 

49 "misses": self.misses, 

50 "evictions": self.evictions, 

51 "total_entries": self.total_entries, 

52 "hit_rate_percent": round(self.hit_rate, 2), 

53 } 

54 

55 

56class ACBChunkCache: 

57 """Async-native, ACB-backed chunk cache for the token optimizer.""" 

58 

59 def __init__(self, ttl: int = 3600) -> None: 

60 """Initialize chunk cache. 

61 

62 Args: 

63 ttl: Default time-to-live in seconds (default: 1 hour) 

64 

65 """ 

66 if AIOCACHE_AVAILABLE: 66 ↛ 74line 66 didn't jump to line 74 because the condition on line 66 was always true

67 self._cache = SimpleMemoryCache( 

68 serializer=PickleSerializer(), 

69 namespace="session_mgmt:chunks:", 

70 ) 

71 self._cache.timeout = 0.0 # No operation timeout 

72 else: 

73 # Fallback when aiocache is not available 

74 self._cache = None 

75 self._ttl = ttl 

76 self.stats = CacheStats() 

77 

78 async def set(self, key: str, value: t.Any, ttl: int | None = None) -> None: 

79 """Store chunk data in cache asynchronously. 

80 

81 Args: 

82 key: Cache key 

83 value: Value to cache (ChunkResult) 

84 ttl: Optional TTL override in seconds 

85 

86 """ 

87 if self._cache is None: 87 ↛ 89line 87 didn't jump to line 89 because the condition on line 87 was never true

88 # Fallback when aiocache is not available 

89 return 

90 effective_ttl = ttl or self._ttl 

91 await self._cache.set(key, value, ttl=effective_ttl) 

92 self.stats.total_entries += 1 

93 

94 async def get(self, key: str) -> t.Any | None: 

95 """Retrieve chunk data from cache asynchronously. 

96 

97 Args: 

98 key: Cache key 

99 

100 Returns: 

101 Cached value or None if not found/expired 

102 

103 """ 

104 if self._cache is None: 104 ↛ 106line 104 didn't jump to line 106 because the condition on line 104 was never true

105 # Fallback when aiocache is not available 

106 self.stats.misses += 1 

107 return None 

108 result = await self._cache.get(key) 

109 if result is None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 self.stats.misses += 1 

111 else: 

112 self.stats.hits += 1 

113 return result 

114 

115 async def delete(self, key: str) -> None: 

116 """Delete chunk data from cache asynchronously. 

117 

118 Args: 

119 key: Cache key to delete 

120 

121 """ 

122 if self._cache is not None: 

123 await self._cache.delete(key) 

124 self.stats.evictions += 1 

125 

126 async def clear(self) -> None: 

127 """Clear all cached chunk data asynchronously.""" 

128 if self._cache is not None: 128 ↛ 130line 128 didn't jump to line 130 because the condition on line 128 was always true

129 await self._cache.clear() 

130 self.stats = CacheStats() 

131 

132 async def __contains__(self, key: str) -> bool: 

133 """Check if key exists in cache asynchronously. 

134 

135 Args: 

136 key: Cache key to check 

137 

138 Returns: 

139 True if key exists and is not expired 

140 

141 """ 

142 if self._cache is None: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 return False 

144 result = await self._cache.exists(key) 

145 return bool(result) 

146 

147 async def __getitem__(self, key: str) -> t.Any: 

148 """Get item using dict syntax asynchronously. 

149 

150 Args: 

151 key: Cache key 

152 

153 Returns: 

154 Cached value 

155 

156 Raises: 

157 KeyError: If key not found in cache 

158 

159 """ 

160 result = await self.get(key) 

161 if result is None: 

162 raise KeyError(key) 

163 return result 

164 

165 async def __setitem__(self, key: str, value: t.Any) -> None: 

166 """Set item using dict syntax asynchronously. 

167 

168 Args: 

169 key: Cache key 

170 value: Value to cache 

171 

172 """ 

173 await self.set(key, value) 

174 

175 async def __delitem__(self, key: str) -> None: 

176 """Delete item using dict syntax asynchronously. 

177 

178 Args: 

179 key: Cache key to delete 

180 

181 """ 

182 await self.delete(key) 

183 

184 async def keys(self) -> list[str]: 

185 """Get all cache keys (not efficiently supported by SimpleMemoryCache).""" 

186 return [] 

187 

188 def get_stats(self) -> dict[str, t.Any]: 

189 """Get cache statistics.""" 

190 return {"chunk_cache": self.stats.to_dict()} 

191 

192 

193class ACBHistoryCache: 

194 """Async-native, ACB-backed history cache for analysis results.""" 

195 

196 def __init__(self, ttl: float = 300.0) -> None: 

197 """Initialize history cache. 

198 

199 Args: 

200 ttl: Time-to-live in seconds (default: 5 minutes) 

201 

202 """ 

203 if AIOCACHE_AVAILABLE: 203 ↛ 211line 203 didn't jump to line 211 because the condition on line 203 was always true

204 self._cache = SimpleMemoryCache( 

205 serializer=PickleSerializer(), 

206 namespace="session_mgmt:history:", 

207 ) 

208 self._cache.timeout = 0.0 

209 else: 

210 # Fallback when aiocache is not available 

211 self._cache = None 

212 self._ttl = int(ttl) 

213 self.stats = CacheStats() 

214 

215 def _generate_key(self, project: str, days: int) -> str: 

216 """Generate cache key from parameters.""" 

217 params = f"{project}:{days}" 

218 return hashlib.md5(params.encode(), usedforsecurity=False).hexdigest() 

219 

220 async def get(self, project: str, days: int) -> dict[str, t.Any] | None: 

221 """Retrieve cached analysis result asynchronously. 

222 

223 Args: 

224 project: Project name 

225 days: Number of days analyzed 

226 

227 Returns: 

228 Cached analysis dict or None if not found/expired 

229 

230 """ 

231 if self._cache is None: 231 ↛ 233line 231 didn't jump to line 233 because the condition on line 231 was never true

232 # Fallback when aiocache is not available 

233 self.stats.misses += 1 

234 return None 

235 key = self._generate_key(project, days) 

236 result: dict[str, t.Any] | None = await self._cache.get(key) 

237 if result is None: 

238 self.stats.misses += 1 

239 else: 

240 self.stats.hits += 1 

241 return result 

242 

243 async def set(self, project: str, days: int, data: dict[str, t.Any]) -> None: 

244 """Store analysis result in cache asynchronously. 

245 

246 Args: 

247 project: Project name 

248 days: Number of days analyzed 

249 data: Analysis result dictionary 

250 

251 """ 

252 if self._cache is not None: 252 ↛ 255line 252 didn't jump to line 255 because the condition on line 252 was always true

253 key = self._generate_key(project, days) 

254 await self._cache.set(key, data, ttl=self._ttl) 

255 self.stats.total_entries += 1 

256 

257 async def invalidate(self, project: str | None = None) -> None: 

258 """Invalidate cache entries asynchronously. 

259 

260 Args: 

261 project: Optional project name (if None, clears entire cache) 

262 

263 """ 

264 if project is None: 264 ↛ 269line 264 didn't jump to line 269 because the condition on line 264 was always true

265 if self._cache is not None: 265 ↛ 267line 265 didn't jump to line 267 because the condition on line 265 was always true

266 await self._cache.clear() 

267 self.stats = CacheStats() 

268 else: 

269 import warnings 

270 

271 warnings.warn( 

272 "ACB cache doesn't support selective invalidation by project. " 

273 "Use invalidate(None) to clear all cached data.", 

274 stacklevel=2, 

275 ) 

276 

277 async def size(self) -> int: 

278 """Get number of cached entries (approximate).""" 

279 return self.stats.total_entries 

280 

281 def get_stats(self) -> dict[str, int]: 

282 """Get cache statistics.""" 

283 return { 

284 "total_entries": self.stats.total_entries, 

285 "hits": self.stats.hits, 

286 "misses": self.stats.misses, 

287 "expired_entries": 0, 

288 "active_entries": self.stats.total_entries, 

289 } 

290 

291 

292# Global cache instances 

293_chunk_cache: ACBChunkCache | None = None 

294_history_cache: ACBHistoryCache | None = None 

295 

296 

297def _resolve_cache_settings() -> "CacheAdapterSettings": 

298 from session_buddy.adapters.settings import CacheAdapterSettings 

299 from session_buddy.di.container import depends 

300 

301 with suppress(Exception): 

302 settings = depends.get_sync(CacheAdapterSettings) 

303 if isinstance(settings, CacheAdapterSettings): 

304 return settings 

305 return CacheAdapterSettings() 

306 

307 

308def get_chunk_cache(ttl: int | None = None) -> ACBChunkCache: 

309 """Get or create global async chunk cache instance.""" 

310 global _chunk_cache 

311 settings = _resolve_cache_settings() 

312 effective_ttl = ttl if ttl is not None else settings.chunk_cache_ttl_seconds 

313 if _chunk_cache is None: 

314 _chunk_cache = ACBChunkCache(ttl=effective_ttl) 

315 return _chunk_cache 

316 

317 

318def get_history_cache(ttl: float | None = None) -> ACBHistoryCache: 

319 """Get or create global async history cache instance.""" 

320 global _history_cache 

321 settings = _resolve_cache_settings() 

322 effective_ttl = ttl if ttl is not None else settings.history_cache_ttl_seconds 

323 if _history_cache is None: 

324 _history_cache = ACBHistoryCache(ttl=effective_ttl) 

325 return _history_cache 

326 

327 

328async def reset_caches() -> None: 

329 """Reset global cache instances asynchronously.""" 

330 global _chunk_cache, _history_cache 

331 if _chunk_cache: 331 ↛ 333line 331 didn't jump to line 333 because the condition on line 331 was always true

332 await _chunk_cache.clear() 

333 if _history_cache: 333 ↛ 335line 333 didn't jump to line 335 because the condition on line 333 was always true

334 await _history_cache.invalidate() 

335 _chunk_cache = None 

336 _history_cache = None