Coverage for agentos/memory/summarizer.py: 34%
132 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.60 Memory Summarizer — 上下文压缩与记忆管理。
3递归摘要 / 重要性评分 / 滑动窗口 / 混合记忆策略。
4"""
6from __future__ import annotations
8import time
9import math
10from dataclasses import dataclass, field
11from enum import Enum
12from typing import Optional, Callable, Any
15class MemoryType(str, Enum):
17 """记忆类型枚举。"""
19 EPISODIC = "episodic" # 对话片段
20 SEMANTIC = "semantic" # 知识点
21 PROCEDURAL = "procedural" # 操作步骤
22 WORKING = "working" # 当前上下文
25@dataclass
26class MemoryChunk:
27 """记忆块。"""
28 id: str
29 content: str
30 mtype: MemoryType = MemoryType.EPISODIC
31 timestamp: float = field(default_factory=time.time)
32 importance: float = 0.5 # 0~1
33 access_count: int = 0
34 token_estimate: int = 0
35 summary: str = ""
36 metadata: dict = field(default_factory=dict)
38 def __post_init__(self):
39 if self.token_estimate == 0:
40 self.token_estimate = max(1, len(self.content) // 3)
43class ImportanceScorer:
44 """多维度重要性评分。"""
46 WEIGHTS = {
47 "recency": 0.20, # 时间衰减
48 "access_frequency": 0.15, # 访问频率
49 "content_length": 0.10, # 内容长度(过短=噪音,适中=有用)
50 "keyword_density": 0.25, # 关键信息密度
51 "task_relevance": 0.30, # 任务相关性(外部传入)
52 }
54 _IMPORTANT_KEYWORDS = [
55 "error", "exception", "fail", "critical", "important",
56 "key", "decision", "conclusion", "result", "summary",
57 "must", "urgent", "deadline", "blocker", "fix",
58 ]
60 @classmethod
61 def score(cls, chunk: MemoryChunk, task_relevance: float = 0.0,
62 current_time: float | None = None) -> float:
63 now = current_time or time.time()
64 scores = {}
66 # 1. 时间衰减(指数衰减,半衰期24h)
67 age_hours = (now - chunk.timestamp) / 3600
68 scores["recency"] = math.exp(-age_hours * math.log(2) / 24)
70 # 2. 访问频率
71 scores["access_frequency"] = min(1.0, chunk.access_count / 10.0)
73 # 3. 内容长度评分(100~2000 token 最佳)
74 t = chunk.token_estimate
75 if t < 50:
76 scores["content_length"] = t / 50 * 0.3
77 elif t <= 2000:
78 scores["content_length"] = 1.0
79 else:
80 scores["content_length"] = max(0.1, 2000 / t)
82 # 4. 关键词密度
83 lowered = chunk.content.lower()
84 keyword_hits = sum(1 for kw in cls._IMPORTANT_KEYWORDS if kw in lowered)
85 scores["keyword_density"] = min(1.0, keyword_hits / 5.0)
87 # 5. 任务相关性
88 scores["task_relevance"] = task_relevance
90 total = sum(cls.WEIGHTS[k] * scores[k] for k in cls.WEIGHTS)
91 return round(min(1.0, max(0.0, total)), 4)
94class MemorySummarizer:
95 """记忆摘要器:递归压缩 + 重要性排序 + 滑动窗口裁剪。"""
97 def __init__(self, max_context_tokens: int = 8000,
98 summarizer_fn: Callable[[str], str] | None = None):
99 self.max_context_tokens = max_context_tokens
100 self._summarizer = summarizer_fn or self._default_summarizer
102 @staticmethod
103 def _default_summarizer(text: str) -> str:
104 """默认摘要器:提取首句 + 关键片段。"""
105 lines = [l.strip() for l in text.split("\n") if l.strip()]
106 if len(lines) <= 3:
107 return " ".join(lines)
108 first = lines[0][:200]
109 # 截取中间代表性句子
110 mid = len(lines) // 2
111 snippet = lines[mid][:150] if mid < len(lines) else ""
112 return f"[{len(lines)}行] {first} ... {snippet}".strip()[:500]
114 # ── 递归摘要 ───────────────────────────────────────────────────────────
116 def recursive_summarize(self, chunks: list[MemoryChunk],
117 target_ratio: float = 0.3) -> list[MemoryChunk]:
118 """递归压缩:反复摘要直到总 token 数降至目标比例以下。"""
119 current = list(chunks)
120 total_tokens = sum(c.token_estimate for c in current)
121 target_tokens = int(self.max_context_tokens * target_ratio)
123 while total_tokens > target_tokens and len(current) > 1:
124 # 合并相邻 chunk 并摘要
125 merged: list[MemoryChunk] = []
126 for i in range(0, len(current) - 1, 2):
127 combined = current[i].content + "\n" + current[i + 1].content
128 summary = self._summarizer(combined)
129 merged.append(MemoryChunk(
130 id=f"sum_{i}",
131 content=summary,
132 mtype=MemoryType.SEMANTIC,
133 importance=max(current[i].importance, current[i + 1].importance),
134 ))
135 if len(current) % 2 == 1:
136 merged.append(current[-1])
137 current = merged
138 total_tokens = sum(c.token_estimate for c in current)
140 return current
142 # ── 重要性排序 ─────────────────────────────────────────────────────────
144 def rank_and_prune(self, chunks: list[MemoryChunk],
145 max_chunks: int = 20) -> list[MemoryChunk]:
146 """按重要性排序并截断。"""
147 scored = [(ImportanceScorer.score(c), c) for c in chunks]
148 scored.sort(key=lambda x: x[0], reverse=True)
149 return [c for _, c in scored[:max_chunks]]
151 # ── 滑动窗口 ───────────────────────────────────────────────────────────
153 def sliding_window(self, chunks: list[MemoryChunk],
154 window_size: int = 10) -> list[MemoryChunk]:
155 """最近N条记忆(按时间排序)。"""
156 sorted_chunks = sorted(chunks, key=lambda c: c.timestamp, reverse=True)
157 return sorted_chunks[:window_size]
159 # ── 混合策略 ───────────────────────────────────────────────────────────
161 def build_context(self, chunks: list[MemoryChunk],
162 strategy: str = "hybrid") -> list[MemoryChunk]:
163 """构建上下文:混合策略 = 重要记忆 + 最近窗口。"""
164 if strategy == "recency":
165 return self.sliding_window(chunks, 15)
166 elif strategy == "importance":
167 return self.rank_and_prune(chunks, 15)
168 elif strategy == "hybrid":
169 recent = set(c.id for c in self.sliding_window(chunks, 7))
170 important = self.rank_and_prune(chunks, 15)
171 hybrid: list[MemoryChunk] = []
172 seen: set[str] = set()
173 for c in important:
174 if c.id not in seen:
175 hybrid.append(c)
176 seen.add(c.id)
177 for c in chunks:
178 if c.id in recent and c.id not in seen:
179 hybrid.append(c)
180 seen.add(c.id)
181 return hybrid
182 return chunks
184 def estimate_tokens(self, chunks: list[MemoryChunk]) -> int:
185 return sum(c.token_estimate for c in chunks)
188class ConversationMemory:
189 """对话记忆:按轮次组织,支持压缩与重置。"""
191 def __init__(self, max_turns: int = 50, summarizer: MemorySummarizer | None = None):
192 self.max_turns = max_turns
193 self.turns: list[MemoryChunk] = []
194 self.summarizer = summarizer or MemorySummarizer()
195 self._backup: list[MemoryChunk] = []
197 def add_turn(self, role: str, content: str, metadata: dict | None = None):
198 chunk = MemoryChunk(
199 id=f"turn_{len(self.turns)}",
200 content=f"[{role}] {content}",
201 mtype=MemoryType.EPISODIC,
202 importance=0.6 if role == "user" else 0.4,
203 metadata=metadata or {},
204 )
205 self.turns.append(chunk)
206 if len(self.turns) > self.max_turns:
207 self.compress()
209 def compress(self):
210 """压缩旧对话为摘要。"""
211 if len(self.turns) <= self.max_turns:
212 return
213 old_half = self.turns[:len(self.turns) // 2]
214 self._backup = old_half
215 compressed = self.summarizer.recursive_summarize(old_half, target_ratio=0.2)
216 self.turns = compressed + self.turns[len(self.turns) // 2:]
218 def clear(self):
219 self.turns.clear()
220 self._backup.clear()
222 def restore(self):
223 """从备份恢复完整对话。"""
224 if self._backup:
225 self.turns = self._backup + self.turns
226 self._backup.clear()
228 @property
229 def total_tokens(self) -> int:
230 return self.summarizer.estimate_tokens(self.turns)