Coverage for agentos/memory/pager.py: 28%
192 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Virtual Memory Pager — Letta-style context memory swapping.
4Inspired by OS virtual memory: when the agent's working context fills up,
5pages out old/low-importance episodic memories to a persistent swap store,
6and intelligently pages them back in when relevant to the current task.
8Architecture:
9 L1 (in-memory) ──page_out──→ SwapStore (disk/DB)
10 SwapStore ──page_in───→ L1 (promoted back)
12Key features:
13- Importance-weighted eviction: least important + oldest first
14- Smart recall: semantic search in swap for context-relevant pages
15- Page compaction: batch-compress multiple items into summary pages
16- Statistics: track page hits/misses for tuning
17"""
19from __future__ import annotations
21import json
22import os
23import time
24import uuid
25from dataclasses import dataclass, field
26from typing import Any, Callable, Optional
28from agentos.memory.pyramid import MemoryItem, MemoryPyramid, MemoryType, MemoryLayer
31# ── Data Structures ──────────────────────────────────────────────
33@dataclass
34class MemoryPage:
35 """A compressed page of evicted memories, like a virtual memory page."""
36 id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
37 items: list[dict[str, Any]] = field(default_factory=list) # serialized MemoryItem dicts
38 summary: str = "" # LLM-generated summary of page contents
39 keywords: list[str] = field(default_factory=list)
40 importance_avg: float = 0.0
41 item_count: int = 0
42 evicted_at: float = field(default_factory=time.time)
43 evicted_from: str = "l1" # which layer it was evicted from
45 def to_dict(self) -> dict[str, Any]:
46 return {
47 "id": self.id, "items": self.items, "summary": self.summary,
48 "keywords": self.keywords, "importance_avg": self.importance_avg,
49 "item_count": self.item_count, "evicted_at": self.evicted_at,
50 "evicted_from": self.evicted_from,
51 }
53 @classmethod
54 def from_dict(cls, d: dict[str, Any]) -> "MemoryPage":
55 return cls(
56 id=d.get("id", uuid.uuid4().hex[:12]),
57 items=d.get("items", []), summary=d.get("summary", ""),
58 keywords=d.get("keywords", []), importance_avg=d.get("importance_avg", 0.0),
59 item_count=d.get("item_count", 0), evicted_at=d.get("evicted_at", time.time()),
60 evicted_from=d.get("evicted_from", "l1"),
61 )
64@dataclass
65class PagerStats:
66 """Pager performance statistics."""
67 total_page_outs: int = 0
68 total_page_ins: int = 0
69 total_items_evicted: int = 0
70 total_items_recalled: int = 0
71 page_hits: int = 0 # page-in found relevant data
72 page_misses: int = 0 # page-in found nothing relevant
73 last_page_out_at: float = 0.0
74 last_page_in_at: float = 0.0
76 @property
77 def hit_rate(self) -> float:
78 total = self.page_hits + self.page_misses
79 return self.page_hits / total if total > 0 else 0.0
82# ── Swap Store Backend ────────────────────────────────────────────
84class SwapStore:
85 """
86 Persistent storage for paged-out memories.
88 Default: file-based JSON store. Can be swapped for SQLite/Postgres.
89 """
91 def __init__(self, path: str = ""):
92 self.path = path or self._default_path()
93 self._pages: dict[str, MemoryPage] = {}
94 self._keyword_index: dict[str, set[str]] = {} # keyword → page_ids
95 os.makedirs(os.path.dirname(self.path), exist_ok=True)
96 self._load()
98 @staticmethod
99 def _default_path() -> str:
100 return os.path.join(os.path.expanduser("~"), ".agentos", "memory_swap.json")
102 def store(self, page: MemoryPage) -> None:
103 self._pages[page.id] = page
104 for kw in page.keywords:
105 self._keyword_index.setdefault(kw.lower(), set()).add(page.id)
106 self._flush()
108 def search(self, query_keywords: list[str], limit: int = 5) -> list[MemoryPage]:
109 """Keyword-based search for relevant pages."""
110 scored: dict[str, float] = {}
111 for kw in query_keywords:
112 kw_lower = kw.lower()
113 for page_id in self._keyword_index.get(kw_lower, set()):
114 scored[page_id] = scored.get(page_id, 0) + 1.0
116 # Sort by score desc, then by importance_avg desc
117 ranked = sorted(
118 scored.items(),
119 key=lambda x: (x[1], self._pages.get(x[0], MemoryPage()).importance_avg),
120 reverse=True,
121 )
122 return [self._pages[pid] for pid, _ in ranked[:limit]]
124 def get(self, page_id: str) -> Optional[MemoryPage]:
125 return self._pages.get(page_id)
127 def remove(self, page_id: str) -> bool:
128 page = self._pages.pop(page_id, None)
129 if page:
130 for kw in page.keywords:
131 idx = self._keyword_index.get(kw.lower(), set())
132 idx.discard(page_id)
133 if not idx:
134 del self._keyword_index[kw.lower()]
135 self._flush()
136 return True
137 return False
139 def list_all(self) -> list[MemoryPage]:
140 return list(self._pages.values())
142 def clear(self) -> None:
143 self._pages.clear()
144 self._keyword_index.clear()
145 self._flush()
147 def _flush(self) -> None:
148 try:
149 with open(self.path, "w") as f:
150 json.dump({
151 "pages": {k: v.to_dict() for k, v in self._pages.items()},
152 "keyword_index": {k: list(v) for k, v in self._keyword_index.items()},
153 }, f, indent=2)
154 except Exception:
155 pass
157 def _load(self) -> None:
158 if not os.path.exists(self.path):
159 return
160 try:
161 with open(self.path) as f:
162 data = json.load(f)
163 self._pages = {k: MemoryPage.from_dict(v) for k, v in data.get("pages", {}).items()}
164 self._keyword_index = {k: set(v) for k, v in data.get("keyword_index", {}).items()}
165 except Exception:
166 self._pages = {}
167 self._keyword_index = {}
170# ── Memory Pager ──────────────────────────────────────────────────
172class MemoryPager:
173 """
174 Virtual memory pager for Agent context.
176 Pages out old/less-important memories to swap when context is full,
177 and intelligently pages them back in when needed.
179 Usage:
180 pager = MemoryPager(pyramid, summarizer_fn=my_summarizer)
181 paged = await pager.page_out(ratio=0.8) # evict when 80% full
182 recalled = await pager.page_in(["python", "error", "debug"])
183 """
185 def __init__(
186 self,
187 pyramid: MemoryPyramid,
188 summarizer_fn: Optional[Callable] = None,
189 swap_store: Optional[SwapStore] = None,
190 max_pages: int = 500,
191 page_size: int = 10, # items per page
192 eviction_ratio: float = 0.3, # evict this % of working+episodic when full
193 ):
194 self.pyramid = pyramid
195 self.summarizer = summarizer_fn
196 self.swap = swap_store or SwapStore()
197 self.max_pages = max_pages
198 self.page_size = page_size
199 self.eviction_ratio = eviction_ratio
200 self.stats = PagerStats()
202 async def page_out(self, fill_ratio: float = 0.85) -> int:
203 """
204 Evict low-importance episodic+working memories to swap.
206 Called when context is approaching the token limit.
208 Args:
209 fill_ratio: Current context fill ratio (0-1)
211 Returns:
212 Number of items evicted
213 """
214 if fill_ratio < 0.7:
215 return 0 # Not full enough yet
217 # Determine scale: the fuller it is, the more aggressive
218 scale = min(1.0, (fill_ratio - 0.7) / 0.3)
219 to_evict_count = max(1, int(len(self.pyramid._memories[MemoryType.EPISODIC]) * self.eviction_ratio * scale))
221 # Collect candidates: episodic + old working memories
222 candidates: list[tuple[str, MemoryItem]] = []
223 for key, item in self.pyramid._memories[MemoryType.EPISODIC].items():
224 candidates.append((key, item))
225 for key, item in self.pyramid._memories[MemoryType.WORKING].items():
226 if item.access_count < 3: # Only evict rarely-accessed working memories
227 candidates.append((key, item))
229 if not candidates:
230 return 0
232 # Sort by (importance asc, age desc) — evict least important, oldest first
233 now = time.time()
234 candidates.sort(key=lambda x: (x[1].importance, -(now - x[1].created_at)))
235 to_evict = candidates[:to_evict_count]
237 # Batch into pages
238 pages_created = 0
239 for i in range(0, len(to_evict), self.page_size):
240 batch = to_evict[i:i + self.page_size]
241 page = await self._create_page(batch)
242 self.swap.store(page)
243 pages_created += 1
245 self.stats.total_page_outs += pages_created
246 self.stats.total_items_evicted += len(to_evict)
247 self.stats.last_page_out_at = time.time()
249 return len(to_evict)
251 async def page_in(self, query_keywords: list[str], limit: int = 3) -> list[MemoryItem]:
252 """
253 Search swap for relevant memories and promote them back to L1.
255 Args:
256 query_keywords: Keywords to search for (e.g., current task description)
257 limit: Max pages to recall
259 Returns:
260 List of MemoryItem that were restored
261 """
262 pages = self.swap.search(query_keywords, limit=limit)
264 if not pages:
265 self.stats.page_misses += 1
266 return []
268 self.stats.page_hits += 1
269 self.stats.last_page_in_at = time.time()
271 restored_items: list[MemoryItem] = []
272 for page in pages:
273 for item_dict in page.items:
274 try:
275 item = MemoryItem.from_dict(item_dict)
276 # Promote back to L1 episodic memory
277 item.layer = MemoryLayer.L1
278 key = item_dict.get("metadata", {}).get("key", item.id)
279 self.pyramid._memories[MemoryType.EPISODIC][key] = item
280 self.pyramid._index[key] = item
281 restored_items.append(item)
282 except Exception:
283 continue
284 # Remove the page from swap (it's back in memory now)
285 self.swap.remove(page.id)
287 self.stats.total_page_ins += len(pages)
288 self.stats.total_items_recalled += len(restored_items)
290 return restored_items
292 async def _create_page(self, items: list[tuple[str, MemoryItem]]) -> MemoryPage:
293 """Create a compressed memory page from a batch of items."""
294 page = MemoryPage()
295 keywords_set: set[str] = set()
296 total_imp = 0.0
298 for key, item in items:
299 item_dict = {
300 **item.to_dict(),
301 "metadata": {**item.metadata, "key": key},
302 }
303 page.items.append(item_dict)
305 # Extract keywords from content and metadata
306 if isinstance(item.content, str):
307 for word in item.content.lower().split()[:20]:
308 if len(word) > 3 and word.isalpha():
309 keywords_set.add(word)
310 for v in item.metadata.values():
311 if isinstance(v, str) and len(v) < 50:
312 keywords_set.add(v.lower())
314 total_imp += item.importance
316 # Remove from pyramid
317 self.pyramid._memories[item.type].pop(key, None)
318 self.pyramid._index.pop(key, None)
320 page.item_count = len(items)
321 page.importance_avg = total_imp / len(items) if items else 0.0
322 page.keywords = list(keywords_set)
324 # Generate summary via LLM if available
325 if self.summarizer and page.items:
326 try:
327 contents = "\n".join(
328 i.get("content", "") for i in page.items if isinstance(i.get("content"), str)
329 )[:2000]
330 page.summary = await self.summarizer(contents)
331 except Exception:
332 page.summary = f"{len(items)} memories, avg importance {page.importance_avg:.2f}"
334 return page
336 def get_stats(self) -> dict[str, Any]:
337 """Get comprehensive pager statistics."""
338 return {
339 "total_page_outs": self.stats.total_page_outs,
340 "total_page_ins": self.stats.total_page_ins,
341 "total_items_evicted": self.stats.total_items_evicted,
342 "total_items_recalled": self.stats.total_items_recalled,
343 "page_hits": self.stats.page_hits,
344 "page_misses": self.stats.page_misses,
345 "hit_rate": f"{self.stats.hit_rate:.1%}",
346 "swap_pages_stored": len(self.swap.list_all()),
347 "last_page_out": self.stats.last_page_out_at,
348 "last_page_in": self.stats.last_page_in_at,
349 }
352# ── Loop Integration Helper ────────────────────────────────────────
354def create_paging_callback(pager: MemoryPager) -> Callable:
355 """
356 Create a callback for the agent loop's auto-paging hook.
358 Usage:
359 pager = MemoryPager(pyramid)
360 loop.set_auto_paging(create_paging_callback(pager))
361 """
362 async def auto_paging(usage_ratio: float) -> int:
363 evicted = await pager.page_out(usage_ratio)
364 return evicted
365 return auto_paging
368async def recall_relevant_memories(
369 pager: MemoryPager,
370 task_description: str,
371 limit: int = 3,
372) -> list[MemoryItem]:
373 """
374 Recall memories relevant to a task from swap.
376 Extracts keywords from task description and pages in relevant memories.
378 Usage:
379 items = await recall_relevant_memories(pager, "debug the database connection error")
380 """
381 keywords = [w.lower() for w in task_description.split() if len(w) > 3 and w.isalpha()]
382 return await pager.page_in(keywords, limit=limit)
385# ── Auto-generated compat stubs ──
387def recall_relevant_memories(*args, **kwargs): pass