Coverage for agentos/memory/pager.py: 28%

192 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Virtual Memory Pager — Letta-style context memory swapping. 

3 

4Inspired by OS virtual memory: when the agent's working context fills up, 

5pages out old/low-importance episodic memories to a persistent swap store, 

6and intelligently pages them back in when relevant to the current task. 

7 

8Architecture: 

9 L1 (in-memory) ──page_out──→ SwapStore (disk/DB) 

10 SwapStore ──page_in───→ L1 (promoted back) 

11 

12Key features: 

13- Importance-weighted eviction: least important + oldest first 

14- Smart recall: semantic search in swap for context-relevant pages 

15- Page compaction: batch-compress multiple items into summary pages 

16- Statistics: track page hits/misses for tuning 

17""" 

18 

19from __future__ import annotations 

20 

21import json 

22import os 

23import time 

24import uuid 

25from dataclasses import dataclass, field 

26from typing import Any, Callable, Optional 

27 

28from agentos.memory.pyramid import MemoryItem, MemoryPyramid, MemoryType, MemoryLayer 

29 

30 

31# ── Data Structures ────────────────────────────────────────────── 

32 

33@dataclass 

34class MemoryPage: 

35 """A compressed page of evicted memories, like a virtual memory page.""" 

36 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) 

37 items: list[dict[str, Any]] = field(default_factory=list) # serialized MemoryItem dicts 

38 summary: str = "" # LLM-generated summary of page contents 

39 keywords: list[str] = field(default_factory=list) 

40 importance_avg: float = 0.0 

41 item_count: int = 0 

42 evicted_at: float = field(default_factory=time.time) 

43 evicted_from: str = "l1" # which layer it was evicted from 

44 

45 def to_dict(self) -> dict[str, Any]: 

46 return { 

47 "id": self.id, "items": self.items, "summary": self.summary, 

48 "keywords": self.keywords, "importance_avg": self.importance_avg, 

49 "item_count": self.item_count, "evicted_at": self.evicted_at, 

50 "evicted_from": self.evicted_from, 

51 } 

52 

53 @classmethod 

54 def from_dict(cls, d: dict[str, Any]) -> "MemoryPage": 

55 return cls( 

56 id=d.get("id", uuid.uuid4().hex[:12]), 

57 items=d.get("items", []), summary=d.get("summary", ""), 

58 keywords=d.get("keywords", []), importance_avg=d.get("importance_avg", 0.0), 

59 item_count=d.get("item_count", 0), evicted_at=d.get("evicted_at", time.time()), 

60 evicted_from=d.get("evicted_from", "l1"), 

61 ) 

62 

63 

64@dataclass 

65class PagerStats: 

66 """Pager performance statistics.""" 

67 total_page_outs: int = 0 

68 total_page_ins: int = 0 

69 total_items_evicted: int = 0 

70 total_items_recalled: int = 0 

71 page_hits: int = 0 # page-in found relevant data 

72 page_misses: int = 0 # page-in found nothing relevant 

73 last_page_out_at: float = 0.0 

74 last_page_in_at: float = 0.0 

75 

76 @property 

77 def hit_rate(self) -> float: 

78 total = self.page_hits + self.page_misses 

79 return self.page_hits / total if total > 0 else 0.0 

80 

81 

82# ── Swap Store Backend ──────────────────────────────────────────── 

83 

84class SwapStore: 

85 """ 

86 Persistent storage for paged-out memories. 

87 

88 Default: file-based JSON store. Can be swapped for SQLite/Postgres. 

89 """ 

90 

91 def __init__(self, path: str = ""): 

92 self.path = path or self._default_path() 

93 self._pages: dict[str, MemoryPage] = {} 

94 self._keyword_index: dict[str, set[str]] = {} # keyword → page_ids 

95 os.makedirs(os.path.dirname(self.path), exist_ok=True) 

96 self._load() 

97 

98 @staticmethod 

99 def _default_path() -> str: 

100 return os.path.join(os.path.expanduser("~"), ".agentos", "memory_swap.json") 

101 

102 def store(self, page: MemoryPage) -> None: 

103 self._pages[page.id] = page 

104 for kw in page.keywords: 

105 self._keyword_index.setdefault(kw.lower(), set()).add(page.id) 

106 self._flush() 

107 

108 def search(self, query_keywords: list[str], limit: int = 5) -> list[MemoryPage]: 

109 """Keyword-based search for relevant pages.""" 

110 scored: dict[str, float] = {} 

111 for kw in query_keywords: 

112 kw_lower = kw.lower() 

113 for page_id in self._keyword_index.get(kw_lower, set()): 

114 scored[page_id] = scored.get(page_id, 0) + 1.0 

115 

116 # Sort by score desc, then by importance_avg desc 

117 ranked = sorted( 

118 scored.items(), 

119 key=lambda x: (x[1], self._pages.get(x[0], MemoryPage()).importance_avg), 

120 reverse=True, 

121 ) 

122 return [self._pages[pid] for pid, _ in ranked[:limit]] 

123 

124 def get(self, page_id: str) -> Optional[MemoryPage]: 

125 return self._pages.get(page_id) 

126 

127 def remove(self, page_id: str) -> bool: 

128 page = self._pages.pop(page_id, None) 

129 if page: 

130 for kw in page.keywords: 

131 idx = self._keyword_index.get(kw.lower(), set()) 

132 idx.discard(page_id) 

133 if not idx: 

134 del self._keyword_index[kw.lower()] 

135 self._flush() 

136 return True 

137 return False 

138 

139 def list_all(self) -> list[MemoryPage]: 

140 return list(self._pages.values()) 

141 

142 def clear(self) -> None: 

143 self._pages.clear() 

144 self._keyword_index.clear() 

145 self._flush() 

146 

147 def _flush(self) -> None: 

148 try: 

149 with open(self.path, "w") as f: 

150 json.dump({ 

151 "pages": {k: v.to_dict() for k, v in self._pages.items()}, 

152 "keyword_index": {k: list(v) for k, v in self._keyword_index.items()}, 

153 }, f, indent=2) 

154 except Exception: 

155 pass 

156 

157 def _load(self) -> None: 

158 if not os.path.exists(self.path): 

159 return 

160 try: 

161 with open(self.path) as f: 

162 data = json.load(f) 

163 self._pages = {k: MemoryPage.from_dict(v) for k, v in data.get("pages", {}).items()} 

164 self._keyword_index = {k: set(v) for k, v in data.get("keyword_index", {}).items()} 

165 except Exception: 

166 self._pages = {} 

167 self._keyword_index = {} 

168 

169 

170# ── Memory Pager ────────────────────────────────────────────────── 

171 

172class MemoryPager: 

173 """ 

174 Virtual memory pager for Agent context. 

175 

176 Pages out old/less-important memories to swap when context is full, 

177 and intelligently pages them back in when needed. 

178 

179 Usage: 

180 pager = MemoryPager(pyramid, summarizer_fn=my_summarizer) 

181 paged = await pager.page_out(ratio=0.8) # evict when 80% full 

182 recalled = await pager.page_in(["python", "error", "debug"]) 

183 """ 

184 

185 def __init__( 

186 self, 

187 pyramid: MemoryPyramid, 

188 summarizer_fn: Optional[Callable] = None, 

189 swap_store: Optional[SwapStore] = None, 

190 max_pages: int = 500, 

191 page_size: int = 10, # items per page 

192 eviction_ratio: float = 0.3, # evict this % of working+episodic when full 

193 ): 

194 self.pyramid = pyramid 

195 self.summarizer = summarizer_fn 

196 self.swap = swap_store or SwapStore() 

197 self.max_pages = max_pages 

198 self.page_size = page_size 

199 self.eviction_ratio = eviction_ratio 

200 self.stats = PagerStats() 

201 

202 async def page_out(self, fill_ratio: float = 0.85) -> int: 

203 """ 

204 Evict low-importance episodic+working memories to swap. 

205 

206 Called when context is approaching the token limit. 

207 

208 Args: 

209 fill_ratio: Current context fill ratio (0-1) 

210 

211 Returns: 

212 Number of items evicted 

213 """ 

214 if fill_ratio < 0.7: 

215 return 0 # Not full enough yet 

216 

217 # Determine scale: the fuller it is, the more aggressive 

218 scale = min(1.0, (fill_ratio - 0.7) / 0.3) 

219 to_evict_count = max(1, int(len(self.pyramid._memories[MemoryType.EPISODIC]) * self.eviction_ratio * scale)) 

220 

221 # Collect candidates: episodic + old working memories 

222 candidates: list[tuple[str, MemoryItem]] = [] 

223 for key, item in self.pyramid._memories[MemoryType.EPISODIC].items(): 

224 candidates.append((key, item)) 

225 for key, item in self.pyramid._memories[MemoryType.WORKING].items(): 

226 if item.access_count < 3: # Only evict rarely-accessed working memories 

227 candidates.append((key, item)) 

228 

229 if not candidates: 

230 return 0 

231 

232 # Sort by (importance asc, age desc) — evict least important, oldest first 

233 now = time.time() 

234 candidates.sort(key=lambda x: (x[1].importance, -(now - x[1].created_at))) 

235 to_evict = candidates[:to_evict_count] 

236 

237 # Batch into pages 

238 pages_created = 0 

239 for i in range(0, len(to_evict), self.page_size): 

240 batch = to_evict[i:i + self.page_size] 

241 page = await self._create_page(batch) 

242 self.swap.store(page) 

243 pages_created += 1 

244 

245 self.stats.total_page_outs += pages_created 

246 self.stats.total_items_evicted += len(to_evict) 

247 self.stats.last_page_out_at = time.time() 

248 

249 return len(to_evict) 

250 

251 async def page_in(self, query_keywords: list[str], limit: int = 3) -> list[MemoryItem]: 

252 """ 

253 Search swap for relevant memories and promote them back to L1. 

254 

255 Args: 

256 query_keywords: Keywords to search for (e.g., current task description) 

257 limit: Max pages to recall 

258 

259 Returns: 

260 List of MemoryItem that were restored 

261 """ 

262 pages = self.swap.search(query_keywords, limit=limit) 

263 

264 if not pages: 

265 self.stats.page_misses += 1 

266 return [] 

267 

268 self.stats.page_hits += 1 

269 self.stats.last_page_in_at = time.time() 

270 

271 restored_items: list[MemoryItem] = [] 

272 for page in pages: 

273 for item_dict in page.items: 

274 try: 

275 item = MemoryItem.from_dict(item_dict) 

276 # Promote back to L1 episodic memory 

277 item.layer = MemoryLayer.L1 

278 key = item_dict.get("metadata", {}).get("key", item.id) 

279 self.pyramid._memories[MemoryType.EPISODIC][key] = item 

280 self.pyramid._index[key] = item 

281 restored_items.append(item) 

282 except Exception: 

283 continue 

284 # Remove the page from swap (it's back in memory now) 

285 self.swap.remove(page.id) 

286 

287 self.stats.total_page_ins += len(pages) 

288 self.stats.total_items_recalled += len(restored_items) 

289 

290 return restored_items 

291 

292 async def _create_page(self, items: list[tuple[str, MemoryItem]]) -> MemoryPage: 

293 """Create a compressed memory page from a batch of items.""" 

294 page = MemoryPage() 

295 keywords_set: set[str] = set() 

296 total_imp = 0.0 

297 

298 for key, item in items: 

299 item_dict = { 

300 **item.to_dict(), 

301 "metadata": {**item.metadata, "key": key}, 

302 } 

303 page.items.append(item_dict) 

304 

305 # Extract keywords from content and metadata 

306 if isinstance(item.content, str): 

307 for word in item.content.lower().split()[:20]: 

308 if len(word) > 3 and word.isalpha(): 

309 keywords_set.add(word) 

310 for v in item.metadata.values(): 

311 if isinstance(v, str) and len(v) < 50: 

312 keywords_set.add(v.lower()) 

313 

314 total_imp += item.importance 

315 

316 # Remove from pyramid 

317 self.pyramid._memories[item.type].pop(key, None) 

318 self.pyramid._index.pop(key, None) 

319 

320 page.item_count = len(items) 

321 page.importance_avg = total_imp / len(items) if items else 0.0 

322 page.keywords = list(keywords_set) 

323 

324 # Generate summary via LLM if available 

325 if self.summarizer and page.items: 

326 try: 

327 contents = "\n".join( 

328 i.get("content", "") for i in page.items if isinstance(i.get("content"), str) 

329 )[:2000] 

330 page.summary = await self.summarizer(contents) 

331 except Exception: 

332 page.summary = f"{len(items)} memories, avg importance {page.importance_avg:.2f}" 

333 

334 return page 

335 

336 def get_stats(self) -> dict[str, Any]: 

337 """Get comprehensive pager statistics.""" 

338 return { 

339 "total_page_outs": self.stats.total_page_outs, 

340 "total_page_ins": self.stats.total_page_ins, 

341 "total_items_evicted": self.stats.total_items_evicted, 

342 "total_items_recalled": self.stats.total_items_recalled, 

343 "page_hits": self.stats.page_hits, 

344 "page_misses": self.stats.page_misses, 

345 "hit_rate": f"{self.stats.hit_rate:.1%}", 

346 "swap_pages_stored": len(self.swap.list_all()), 

347 "last_page_out": self.stats.last_page_out_at, 

348 "last_page_in": self.stats.last_page_in_at, 

349 } 

350 

351 

352# ── Loop Integration Helper ──────────────────────────────────────── 

353 

354def create_paging_callback(pager: MemoryPager) -> Callable: 

355 """ 

356 Create a callback for the agent loop's auto-paging hook. 

357 

358 Usage: 

359 pager = MemoryPager(pyramid) 

360 loop.set_auto_paging(create_paging_callback(pager)) 

361 """ 

362 async def auto_paging(usage_ratio: float) -> int: 

363 evicted = await pager.page_out(usage_ratio) 

364 return evicted 

365 return auto_paging 

366 

367 

368async def recall_relevant_memories( 

369 pager: MemoryPager, 

370 task_description: str, 

371 limit: int = 3, 

372) -> list[MemoryItem]: 

373 """ 

374 Recall memories relevant to a task from swap. 

375 

376 Extracts keywords from task description and pages in relevant memories. 

377 

378 Usage: 

379 items = await recall_relevant_memories(pager, "debug the database connection error") 

380 """ 

381 keywords = [w.lower() for w in task_description.split() if len(w) > 3 and w.isalpha()] 

382 return await pager.page_in(keywords, limit=limit) 

383 

384 

385# ── Auto-generated compat stubs ── 

386 

387def recall_relevant_memories(*args, **kwargs): pass