Coverage for agentos/memory/summarizer.py: 34%

132 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.60 Memory Summarizer — 上下文压缩与记忆管理。 

3递归摘要 / 重要性评分 / 滑动窗口 / 混合记忆策略。 

4""" 

5 

6from __future__ import annotations 

7 

8import time 

9import math 

10from dataclasses import dataclass, field 

11from enum import Enum 

12from typing import Optional, Callable, Any 

13 

14 

15class MemoryType(str, Enum): 

16 

17 """记忆类型枚举。""" 

18 

19 EPISODIC = "episodic" # 对话片段 

20 SEMANTIC = "semantic" # 知识点 

21 PROCEDURAL = "procedural" # 操作步骤 

22 WORKING = "working" # 当前上下文 

23 

24 

25@dataclass 

26class MemoryChunk: 

27 """记忆块。""" 

28 id: str 

29 content: str 

30 mtype: MemoryType = MemoryType.EPISODIC 

31 timestamp: float = field(default_factory=time.time) 

32 importance: float = 0.5 # 0~1 

33 access_count: int = 0 

34 token_estimate: int = 0 

35 summary: str = "" 

36 metadata: dict = field(default_factory=dict) 

37 

38 def __post_init__(self): 

39 if self.token_estimate == 0: 

40 self.token_estimate = max(1, len(self.content) // 3) 

41 

42 

43class ImportanceScorer: 

44 """多维度重要性评分。""" 

45 

46 WEIGHTS = { 

47 "recency": 0.20, # 时间衰减 

48 "access_frequency": 0.15, # 访问频率 

49 "content_length": 0.10, # 内容长度(过短=噪音,适中=有用) 

50 "keyword_density": 0.25, # 关键信息密度 

51 "task_relevance": 0.30, # 任务相关性(外部传入) 

52 } 

53 

54 _IMPORTANT_KEYWORDS = [ 

55 "error", "exception", "fail", "critical", "important", 

56 "key", "decision", "conclusion", "result", "summary", 

57 "must", "urgent", "deadline", "blocker", "fix", 

58 ] 

59 

60 @classmethod 

61 def score(cls, chunk: MemoryChunk, task_relevance: float = 0.0, 

62 current_time: float | None = None) -> float: 

63 now = current_time or time.time() 

64 scores = {} 

65 

66 # 1. 时间衰减(指数衰减,半衰期24h) 

67 age_hours = (now - chunk.timestamp) / 3600 

68 scores["recency"] = math.exp(-age_hours * math.log(2) / 24) 

69 

70 # 2. 访问频率 

71 scores["access_frequency"] = min(1.0, chunk.access_count / 10.0) 

72 

73 # 3. 内容长度评分(100~2000 token 最佳) 

74 t = chunk.token_estimate 

75 if t < 50: 

76 scores["content_length"] = t / 50 * 0.3 

77 elif t <= 2000: 

78 scores["content_length"] = 1.0 

79 else: 

80 scores["content_length"] = max(0.1, 2000 / t) 

81 

82 # 4. 关键词密度 

83 lowered = chunk.content.lower() 

84 keyword_hits = sum(1 for kw in cls._IMPORTANT_KEYWORDS if kw in lowered) 

85 scores["keyword_density"] = min(1.0, keyword_hits / 5.0) 

86 

87 # 5. 任务相关性 

88 scores["task_relevance"] = task_relevance 

89 

90 total = sum(cls.WEIGHTS[k] * scores[k] for k in cls.WEIGHTS) 

91 return round(min(1.0, max(0.0, total)), 4) 

92 

93 

94class MemorySummarizer: 

95 """记忆摘要器:递归压缩 + 重要性排序 + 滑动窗口裁剪。""" 

96 

97 def __init__(self, max_context_tokens: int = 8000, 

98 summarizer_fn: Callable[[str], str] | None = None): 

99 self.max_context_tokens = max_context_tokens 

100 self._summarizer = summarizer_fn or self._default_summarizer 

101 

102 @staticmethod 

103 def _default_summarizer(text: str) -> str: 

104 """默认摘要器:提取首句 + 关键片段。""" 

105 lines = [l.strip() for l in text.split("\n") if l.strip()] 

106 if len(lines) <= 3: 

107 return " ".join(lines) 

108 first = lines[0][:200] 

109 # 截取中间代表性句子 

110 mid = len(lines) // 2 

111 snippet = lines[mid][:150] if mid < len(lines) else "" 

112 return f"[{len(lines)}行] {first} ... {snippet}".strip()[:500] 

113 

114 # ── 递归摘要 ─────────────────────────────────────────────────────────── 

115 

116 def recursive_summarize(self, chunks: list[MemoryChunk], 

117 target_ratio: float = 0.3) -> list[MemoryChunk]: 

118 """递归压缩:反复摘要直到总 token 数降至目标比例以下。""" 

119 current = list(chunks) 

120 total_tokens = sum(c.token_estimate for c in current) 

121 target_tokens = int(self.max_context_tokens * target_ratio) 

122 

123 while total_tokens > target_tokens and len(current) > 1: 

124 # 合并相邻 chunk 并摘要 

125 merged: list[MemoryChunk] = [] 

126 for i in range(0, len(current) - 1, 2): 

127 combined = current[i].content + "\n" + current[i + 1].content 

128 summary = self._summarizer(combined) 

129 merged.append(MemoryChunk( 

130 id=f"sum_{i}", 

131 content=summary, 

132 mtype=MemoryType.SEMANTIC, 

133 importance=max(current[i].importance, current[i + 1].importance), 

134 )) 

135 if len(current) % 2 == 1: 

136 merged.append(current[-1]) 

137 current = merged 

138 total_tokens = sum(c.token_estimate for c in current) 

139 

140 return current 

141 

142 # ── 重要性排序 ───────────────────────────────────────────────────────── 

143 

144 def rank_and_prune(self, chunks: list[MemoryChunk], 

145 max_chunks: int = 20) -> list[MemoryChunk]: 

146 """按重要性排序并截断。""" 

147 scored = [(ImportanceScorer.score(c), c) for c in chunks] 

148 scored.sort(key=lambda x: x[0], reverse=True) 

149 return [c for _, c in scored[:max_chunks]] 

150 

151 # ── 滑动窗口 ─────────────────────────────────────────────────────────── 

152 

153 def sliding_window(self, chunks: list[MemoryChunk], 

154 window_size: int = 10) -> list[MemoryChunk]: 

155 """最近N条记忆(按时间排序)。""" 

156 sorted_chunks = sorted(chunks, key=lambda c: c.timestamp, reverse=True) 

157 return sorted_chunks[:window_size] 

158 

159 # ── 混合策略 ─────────────────────────────────────────────────────────── 

160 

161 def build_context(self, chunks: list[MemoryChunk], 

162 strategy: str = "hybrid") -> list[MemoryChunk]: 

163 """构建上下文:混合策略 = 重要记忆 + 最近窗口。""" 

164 if strategy == "recency": 

165 return self.sliding_window(chunks, 15) 

166 elif strategy == "importance": 

167 return self.rank_and_prune(chunks, 15) 

168 elif strategy == "hybrid": 

169 recent = set(c.id for c in self.sliding_window(chunks, 7)) 

170 important = self.rank_and_prune(chunks, 15) 

171 hybrid: list[MemoryChunk] = [] 

172 seen: set[str] = set() 

173 for c in important: 

174 if c.id not in seen: 

175 hybrid.append(c) 

176 seen.add(c.id) 

177 for c in chunks: 

178 if c.id in recent and c.id not in seen: 

179 hybrid.append(c) 

180 seen.add(c.id) 

181 return hybrid 

182 return chunks 

183 

184 def estimate_tokens(self, chunks: list[MemoryChunk]) -> int: 

185 return sum(c.token_estimate for c in chunks) 

186 

187 

188class ConversationMemory: 

189 """对话记忆:按轮次组织,支持压缩与重置。""" 

190 

191 def __init__(self, max_turns: int = 50, summarizer: MemorySummarizer | None = None): 

192 self.max_turns = max_turns 

193 self.turns: list[MemoryChunk] = [] 

194 self.summarizer = summarizer or MemorySummarizer() 

195 self._backup: list[MemoryChunk] = [] 

196 

197 def add_turn(self, role: str, content: str, metadata: dict | None = None): 

198 chunk = MemoryChunk( 

199 id=f"turn_{len(self.turns)}", 

200 content=f"[{role}] {content}", 

201 mtype=MemoryType.EPISODIC, 

202 importance=0.6 if role == "user" else 0.4, 

203 metadata=metadata or {}, 

204 ) 

205 self.turns.append(chunk) 

206 if len(self.turns) > self.max_turns: 

207 self.compress() 

208 

209 def compress(self): 

210 """压缩旧对话为摘要。""" 

211 if len(self.turns) <= self.max_turns: 

212 return 

213 old_half = self.turns[:len(self.turns) // 2] 

214 self._backup = old_half 

215 compressed = self.summarizer.recursive_summarize(old_half, target_ratio=0.2) 

216 self.turns = compressed + self.turns[len(self.turns) // 2:] 

217 

218 def clear(self): 

219 self.turns.clear() 

220 self._backup.clear() 

221 

222 def restore(self): 

223 """从备份恢复完整对话。""" 

224 if self._backup: 

225 self.turns = self._backup + self.turns 

226 self._backup.clear() 

227 

228 @property 

229 def total_tokens(self) -> int: 

230 return self.summarizer.estimate_tokens(self.turns)