Coverage for memory / compressor.py: 18%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 02:55 +0800
1"""
2上下文压缩模块。
4压缩策略:递归摘要 + 目标范围控制
5- 目标范围:压缩后(摘要 + 短期记忆)占上下文窗口的 20%~25%
6- 保留窗口:从最新消息往前选,只限制 tokens(不限制条数)
7- 递归摘要:检测旧摘要,有则合并一起压缩,保证信息不丢失
9Token 计算:使用 tiktoken 精确计算,支持 GPT-4o 等模型
10"""
11import tiktoken
12from qrclaw.config import (
13 OPENAI_MODEL,
14 COMPRESS_SUMMARY_MAX_TOKENS, COMPRESS_SUMMARY_TARGET_TOKENS,
15 COMPRESS_RECENT_MAX_TOKENS,
16 COMPRESS_TARGET_MIN_RATIO, COMPRESS_TARGET_MAX_RATIO,
17 _MODEL_MAX_TOKENS,
18)
19from qrclaw.logger import get_logger
21logger = get_logger("qrclaw.memory.compressor")
23# 初始化 tiktoken encoder
24# 对于未知模型,fallback 到 cl100k_base(GPT-4/4o 使用)
25try:
26 _encoding = tiktoken.encoding_for_model(OPENAI_MODEL)
27except KeyError:
28 _encoding = tiktoken.get_encoding("cl100k_base")
29 logger.debug(f"模型 {OPENAI_MODEL} 无对应 encoder,使用 cl100k_base")
32def count_tokens(messages: list[dict]) -> int:
33 """
34 精确计算消息列表的 token 数。
36 Args:
37 messages: OpenAI 格式的消息列表
39 Returns:
40 int: token 总数
41 """
42 tokens = 0
43 for msg in messages:
44 # 每条消息有固定开销
45 tokens += 4 # {"role": "...", "content": "..."} 格式开销
46 for key, value in msg.items():
47 if value is not None:
48 tokens += len(_encoding.encode(str(value)))
49 tokens += 2 # 对话开销
50 return tokens
53def count_text_tokens(text: str) -> int:
54 """
55 精确计算文本的 token 数。
57 Args:
58 text: 文本内容
60 Returns:
61 int: token 数
62 """
63 return len(_encoding.encode(text))
66SUMMARIZE_PROMPT = """请把下面的对话内容整理成结构化摘要,要求:
671. 按以下分类输出,没有内容的分类可以省略
682. 保留所有关键信息,不要遗漏重要细节
693. 语言简洁,不要废话
704. 控制摘要长度,目标约 {target_tokens} tokens
72格式:
73【用户信息】用户的基本信息、身份、偏好等
74【已完成任务】已经完成的操作和结果
75【关键结论】重要的决策、发现、约定
76【待处理事项】还没完成的任务
78对话内容:
79{history}
80"""
83def _pick_recent(messages: list[dict], max_tokens: int = None) -> tuple[list[dict], list[dict]]:
84 """
85 从最新消息往前选,只限制 tokens(不限制条数)。
87 返回 (recent, old):recent 是保留的,old 是要压缩的。
88 """
89 if max_tokens is None:
90 max_tokens = COMPRESS_RECENT_MAX_TOKENS
92 recent = []
93 token_count = 0
95 for msg in reversed(messages):
96 t = count_tokens([msg])
97 if token_count + t > max_tokens:
98 break
99 recent.insert(0, msg)
100 token_count += t
102 old = messages[:len(messages) - len(recent)]
103 logger.debug(f"保留窗口: {len(recent)} 条, {token_count} tokens; 待压缩: {len(old)} 条")
104 return recent, old
107def summarize(session) -> None:
108 """
109 递归摘要压缩:
110 1. 从最新消息往前选,只限制 tokens
111 2. 检测是否有旧摘要,有则合并进待压缩内容(递归摘要)
112 3. 调用 LLM 生成新摘要
113 4. 检查总长度是否在 20%~25% 范围内,必要时调整
114 """
115 logger.info("开始压缩历史消息")
117 # 计算目标范围
118 target_min = int(_MODEL_MAX_TOKENS * COMPRESS_TARGET_MIN_RATIO)
119 target_max = int(_MODEL_MAX_TOKENS * COMPRESS_TARGET_MAX_RATIO)
120 logger.debug(f"目标范围: {target_min} ~ {target_max} tokens ({COMPRESS_TARGET_MIN_RATIO*100:.0f}% ~ {COMPRESS_TARGET_MAX_RATIO*100:.0f}%)")
122 recent, old = _pick_recent(session.messages)
124 if not old:
125 logger.warning("没有旧消息可压缩,跳过")
126 return
128 # 检测 old 里是否有旧摘要(递归摘要:旧摘要一起参与本次压缩)
129 has_old_summary = any(
130 msg.get("role") == "assistant" and str(msg.get("content", "")).startswith("[SUMMARY]")
131 for msg in old
132 )
133 if has_old_summary:
134 logger.info("检测到旧摘要,合并进行递归压缩")
136 history_text = "\n\n".join(
137 f"[{msg.get('role', 'unknown')}]: {msg.get('content', '')}"
138 for msg in old
139 if msg.get("content")
140 )
142 try:
143 from qrclaw.providers import provider
144 logger.debug(f"调用 LLM 生成摘要,目标: {COMPRESS_SUMMARY_TARGET_TOKENS} tokens")
145 resp = provider.chat([
146 {"role": "user", "content": SUMMARIZE_PROMPT.format(
147 history=history_text,
148 target_tokens=COMPRESS_SUMMARY_TARGET_TOKENS,
149 )}
150 ])
151 summary = resp.content
152 summary_tokens = count_text_tokens(summary)
154 logger.info(f"摘要生成成功,长度: {len(summary)} 字符,{summary_tokens} tokens")
155 logger.debug(f"摘要内容预览: {summary[:200]}...")
157 # 计算短期记忆的 token 数
158 recent_tokens = count_tokens(recent)
159 total_tokens = summary_tokens + recent_tokens
161 logger.info(f"压缩后总长度: {total_tokens} tokens (摘要 {summary_tokens} + 短期记忆 {recent_tokens})")
163 # 检查是否在目标范围内
164 if total_tokens > target_max:
165 logger.warning(f"压缩后 {total_tokens} tokens 超过上限 {target_max},减少短期记忆")
166 # 减少短期记忆的 token 预算
167 available_for_recent = target_max - summary_tokens
168 if available_for_recent > 0:
169 recent, old = _pick_recent(session.messages, max_tokens=available_for_recent)
170 recent_tokens = count_tokens(recent)
171 total_tokens = summary_tokens + recent_tokens
172 logger.info(f"调整后: {total_tokens} tokens (摘要 {summary_tokens} + 短期记忆 {recent_tokens})")
173 else:
174 logger.warning("摘要本身已超过上限,保持现状")
176 elif total_tokens < target_min:
177 logger.info(f"压缩后 {total_tokens} tokens 低于下限 {target_min},但这是好事,保持现状")
179 # 重建 session.messages
180 session.messages = [
181 {"role": "assistant", "content": f"[SUMMARY] 以下是之前对话的结构化摘要:\n{summary}"},
182 *recent,
183 ]
184 session._save()
186 ratio = total_tokens / _MODEL_MAX_TOKENS * 100
187 logger.info(f"压缩完成,消息数: {len(session.messages)},占比: {ratio:.1f}%")
188 print(f" [系统] 上下文压缩完成,占比 {ratio:.1f}%")
190 except Exception as e:
191 logger.error(f"压缩失败: {e}", exc_info=True)
192 raise
195def truncate(session, keep: int = 20) -> None:
196 """
197 滚动截断策略:直接丢弃旧消息,只保留最近 keep 条。
198 最简单,但会丢失早期信息。
199 """
200 if len(session.messages) > keep:
201 old_count = len(session.messages)
202 session.messages = session.messages[-keep:]
203 session._save()
204 logger.info(f"滚动截断完成,保留最近 {keep} 条消息 (丢弃 {old_count - keep} 条)")
205 else:
206 logger.debug(f"消息数未超过阈值 ({len(session.messages)}/{keep}),无需截断")